This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 63ef2fa7d [flink] Skip listPartitions scan for 
FlinkRecomputeStatisticsProgram (#3261)
63ef2fa7d is described below

commit 63ef2fa7d13b5736a260b0972ed69b3a1146509f
Author: Wencong Liu <[email protected]>
AuthorDate: Thu Apr 25 17:11:43 2024 +0800

    [flink] Skip listPartitions scan for FlinkRecomputeStatisticsProgram (#3261)
---
 .../java/org/apache/paimon/flink/FlinkCatalog.java | 24 ++++++++++++++++++++++
 1 file changed, 24 insertions(+)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 71fbf0fa6..90effe57f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -85,6 +85,8 @@ import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.procedures.Procedure;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -125,6 +127,9 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Catalog for paimon. */
 public class FlinkCatalog extends AbstractCatalog {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkCatalog.class);
+
     private final ClassLoader classLoader;
 
     private final Catalog catalog;
@@ -822,6 +827,15 @@ public class FlinkCatalog extends AbstractCatalog {
     @Override
     public final List<CatalogPartitionSpec> listPartitions(ObjectPath 
tablePath)
             throws TableNotExistException, TableNotPartitionedException, 
CatalogException {
+        // The method is skipped if it is being called as part of 
FlinkRecomputeStatisticsProgram,
+        // since this program scans the entire table and all its partitions, 
which is a
+        // time-consuming operation. By returning an empty result, we can 
prompt the FlinkPlanner to
+        // use the FlinkTableSource#reportStatistics method to gather the 
necessary statistics.
+        if (isCalledFromFlinkRecomputeStatisticsProgram()) {
+            LOG.info(
+                    "Skipping listPartitions method due to detection of 
FlinkRecomputeStatisticsProgram call.");
+            return Collections.emptyList();
+        }
         return getPartitionSpecs(tablePath, null);
     }
 
@@ -1057,4 +1071,14 @@ public class FlinkCatalog extends AbstractCatalog {
         return ProcedureUtil.getProcedure(catalog, procedurePath)
                 .orElseThrow(() -> new ProcedureNotExistException(name, 
procedurePath));
     }
+
+    private boolean isCalledFromFlinkRecomputeStatisticsProgram() {
+        StackTraceElement[] stackTrace = 
Thread.currentThread().getStackTrace();
+        for (StackTraceElement stackTraceElement : stackTrace) {
+            if 
(stackTraceElement.getClassName().contains("FlinkRecomputeStatisticsProgram")) {
+                return true;
+            }
+        }
+        return false;
+    }
 }

Reply via email to