This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 63ef2fa7d [flink] Skip listPartitions scan for
FlinkRecomputeStatisticsProgram (#3261)
63ef2fa7d is described below
commit 63ef2fa7d13b5736a260b0972ed69b3a1146509f
Author: Wencong Liu <[email protected]>
AuthorDate: Thu Apr 25 17:11:43 2024 +0800
[flink] Skip listPartitions scan for FlinkRecomputeStatisticsProgram (#3261)
---
.../java/org/apache/paimon/flink/FlinkCatalog.java | 24 ++++++++++++++++++++++
1 file changed, 24 insertions(+)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 71fbf0fa6..90effe57f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -85,6 +85,8 @@ import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.procedures.Procedure;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -125,6 +127,9 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
/** Catalog for paimon. */
public class FlinkCatalog extends AbstractCatalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkCatalog.class);
+
private final ClassLoader classLoader;
private final Catalog catalog;
@@ -822,6 +827,15 @@ public class FlinkCatalog extends AbstractCatalog {
@Override
public final List<CatalogPartitionSpec> listPartitions(ObjectPath
tablePath)
throws TableNotExistException, TableNotPartitionedException,
CatalogException {
+ // The method is skipped if it is being called as part of
FlinkRecomputeStatisticsProgram,
+ // since this program scans the entire table and all its partitions,
which is a
+ // time-consuming operation. By returning an empty result, we can
prompt the FlinkPlanner to
+ // use the FlinkTableSource#reportStatistics method to gather the
necessary statistics.
+ if (isCalledFromFlinkRecomputeStatisticsProgram()) {
+ LOG.info(
+ "Skipping listPartitions method due to detection of
FlinkRecomputeStatisticsProgram call.");
+ return Collections.emptyList();
+ }
return getPartitionSpecs(tablePath, null);
}
@@ -1057,4 +1071,14 @@ public class FlinkCatalog extends AbstractCatalog {
return ProcedureUtil.getProcedure(catalog, procedurePath)
.orElseThrow(() -> new ProcedureNotExistException(name,
procedurePath));
}
+
+ private boolean isCalledFromFlinkRecomputeStatisticsProgram() {
+ StackTraceElement[] stackTrace =
Thread.currentThread().getStackTrace();
+ for (StackTraceElement stackTraceElement : stackTrace) {
+ if
(stackTraceElement.getClassName().contains("FlinkRecomputeStatisticsProgram")) {
+ return true;
+ }
+ }
+ return false;
+ }
}