This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b4ccb58c25 [flink] Enable limit pushdown and count optimization for dv 
table (#4712)
b4ccb58c25 is described below

commit b4ccb58c2551d4f9c89728f0bca619cfa16c0edb
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Dec 16 14:13:12 2024 +0800

    [flink] Enable limit pushdown and count optimization for dv table (#4712)
---
 .../paimon/flink/source/DataTableSource.java       |  8 ++--
 .../paimon/flink/source/BaseDataTableSource.java   | 45 ++++++++++++----------
 .../paimon/flink/source/DataTableSource.java       |  8 ++--
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 24 ++++++++++--
 .../FileStoreTableStatisticsTestBase.java          |  6 +--
 .../statistics/PrimaryKeyTableStatisticsTest.java  |  2 +-
 6 files changed, 57 insertions(+), 36 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
 
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index ee00d41832..f41f8da6c8 100644
--- 
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ 
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -51,7 +51,7 @@ public class DataTableSource extends BaseDataTableSource {
                 null,
                 null,
                 null,
-                false);
+                null);
     }
 
     public DataTableSource(
@@ -64,7 +64,7 @@ public class DataTableSource extends BaseDataTableSource {
             @Nullable int[][] projectFields,
             @Nullable Long limit,
             @Nullable WatermarkStrategy<RowData> watermarkStrategy,
-            boolean isBatchCountStar) {
+            @Nullable Long countPushed) {
         super(
                 tableIdentifier,
                 table,
@@ -75,7 +75,7 @@ public class DataTableSource extends BaseDataTableSource {
                 projectFields,
                 limit,
                 watermarkStrategy,
-                isBatchCountStar);
+                countPushed);
     }
 
     @Override
@@ -90,7 +90,7 @@ public class DataTableSource extends BaseDataTableSource {
                 projectFields,
                 limit,
                 watermarkStrategy,
-                isBatchCountStar);
+                countPushed);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
index 5dbbdcedd8..a94d799773 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
@@ -28,7 +28,6 @@ import org.apache.paimon.flink.log.LogSourceProvider;
 import org.apache.paimon.flink.log.LogStoreTableFactory;
 import org.apache.paimon.flink.lookup.FileStoreLookupFunction;
 import org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory;
-import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
@@ -36,7 +35,8 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
 import org.apache.paimon.utils.Projection;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -74,6 +74,7 @@ import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGN
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /**
  * Table source to create {@link StaticFileStoreSource} or {@link 
ContinuousFileStoreSource} under
@@ -98,7 +99,7 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
     @Nullable protected final LogStoreTableFactory logStoreTableFactory;
 
     @Nullable protected WatermarkStrategy<RowData> watermarkStrategy;
-    protected boolean isBatchCountStar;
+    @Nullable protected Long countPushed;
 
     public BaseDataTableSource(
             ObjectIdentifier tableIdentifier,
@@ -110,7 +111,7 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
             @Nullable int[][] projectFields,
             @Nullable Long limit,
             @Nullable WatermarkStrategy<RowData> watermarkStrategy,
-            boolean isBatchCountStar) {
+            @Nullable Long countPushed) {
         super(table, predicate, projectFields, limit);
         this.tableIdentifier = tableIdentifier;
         this.streaming = streaming;
@@ -120,7 +121,7 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
         this.projectFields = projectFields;
         this.limit = limit;
         this.watermarkStrategy = watermarkStrategy;
-        this.isBatchCountStar = isBatchCountStar;
+        this.countPushed = countPushed;
     }
 
     @Override
@@ -159,7 +160,7 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
 
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) 
{
-        if (isBatchCountStar) {
+        if (countPushed != null) {
             return createCountStarScan();
         }
 
@@ -212,10 +213,8 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
     }
 
     private ScanRuntimeProvider createCountStarScan() {
-        TableScan scan = 
table.newReadBuilder().withFilter(predicate).newScan();
-        List<PartitionEntry> partitionEntries = scan.listPartitionEntries();
-        long rowCount = 
partitionEntries.stream().mapToLong(PartitionEntry::recordCount).sum();
-        NumberSequenceRowSource source = new NumberSequenceRowSource(rowCount, 
rowCount);
+        checkNotNull(countPushed);
+        NumberSequenceRowSource source = new 
NumberSequenceRowSource(countPushed, countPushed);
         return new SourceProvider() {
             @Override
             public Source<RowData, ?, ?> createSource() {
@@ -303,15 +302,6 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
             return false;
         }
 
-        if (!table.primaryKeys().isEmpty()) {
-            return false;
-        }
-
-        CoreOptions options = ((DataTable) table).coreOptions();
-        if (options.deletionVectorsEnabled()) {
-            return false;
-        }
-
         if (groupingSets.size() != 1) {
             return false;
         }
@@ -334,7 +324,22 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
             return false;
         }
 
-        isBatchCountStar = true;
+        List<Split> splits =
+                
table.newReadBuilder().dropStats().withFilter(predicate).newScan().plan().splits();
+        long countPushed = 0;
+        for (Split s : splits) {
+            if (!(s instanceof DataSplit)) {
+                return false;
+            }
+            DataSplit split = (DataSplit) s;
+            if (!split.mergedRowCountAvailable()) {
+                return false;
+            }
+
+            countPushed += split.mergedRowCount();
+        }
+
+        this.countPushed = countPushed;
         return true;
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index 53a1b5f630..2b470cb438 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -70,7 +70,7 @@ public class DataTableSource extends BaseDataTableSource
                 null,
                 null,
                 null,
-                false);
+                null);
     }
 
     public DataTableSource(
@@ -84,7 +84,7 @@ public class DataTableSource extends BaseDataTableSource
             @Nullable Long limit,
             @Nullable WatermarkStrategy<RowData> watermarkStrategy,
             @Nullable List<String> dynamicPartitionFilteringFields,
-            boolean isBatchCountStar) {
+            @Nullable Long countPushed) {
         super(
                 tableIdentifier,
                 table,
@@ -95,7 +95,7 @@ public class DataTableSource extends BaseDataTableSource
                 projectFields,
                 limit,
                 watermarkStrategy,
-                isBatchCountStar);
+                countPushed);
         this.dynamicPartitionFilteringFields = dynamicPartitionFilteringFields;
     }
 
@@ -112,7 +112,7 @@ public class DataTableSource extends BaseDataTableSource
                 limit,
                 watermarkStrategy,
                 dynamicPartitionFilteringFields,
-                isBatchCountStar);
+                countPushed);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index cdc114b048..d48b6e7712 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -561,19 +561,35 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
 
         String sql = "SELECT COUNT(*) FROM count_append_dv";
         assertThat(sql(sql)).containsOnly(Row.of(2L));
-        validateCount1NotPushDown(sql);
+        validateCount1PushDown(sql);
     }
 
     @Test
     public void testCountStarPK() {
-        sql("CREATE TABLE count_pk (f0 INT PRIMARY KEY NOT ENFORCED, f1 
STRING)");
-        sql("INSERT INTO count_pk VALUES (1, 'a'), (2, 'b')");
+        sql(
+                "CREATE TABLE count_pk (f0 INT PRIMARY KEY NOT ENFORCED, f1 
STRING) WITH ('file.format' = 'avro')");
+        sql("INSERT INTO count_pk VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 
'd')");
+        sql("INSERT INTO count_pk VALUES (1, 'e')");
 
         String sql = "SELECT COUNT(*) FROM count_pk";
-        assertThat(sql(sql)).containsOnly(Row.of(2L));
+        assertThat(sql(sql)).containsOnly(Row.of(4L));
         validateCount1NotPushDown(sql);
     }
 
+    @Test
+    public void testCountStarPKDv() {
+        sql(
+                "CREATE TABLE count_pk_dv (f0 INT PRIMARY KEY NOT ENFORCED, f1 
STRING) WITH ("
+                        + "'file.format' = 'avro', "
+                        + "'deletion-vectors.enabled' = 'true')");
+        sql("INSERT INTO count_pk_dv VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 
'd')");
+        sql("INSERT INTO count_pk_dv VALUES (1, 'e')");
+
+        String sql = "SELECT COUNT(*) FROM count_pk_dv";
+        assertThat(sql(sql)).containsOnly(Row.of(4L));
+        validateCount1PushDown(sql);
+    }
+
     @Test
     public void testParquetRowDecimalAndTimestamp() {
         sql(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
index 826bf28d12..42a47ea1e2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
@@ -153,7 +153,7 @@ public abstract class FileStoreTableStatisticsTestBase {
                         null,
                         null,
                         null,
-                        false);
+                        null);
         
Assertions.assertThat(partitionFilterSource.reportStatistics().getRowCount()).isEqualTo(5L);
         Map<String, ColStats<?>> colStatsMap = new HashMap<>();
         colStatsMap.put("pt", ColStats.newColStats(0, 1L, 1, 2, 0L, null, 
null));
@@ -232,7 +232,7 @@ public abstract class FileStoreTableStatisticsTestBase {
                         null,
                         null,
                         null,
-                        false);
+                        null);
         
Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(2L);
         Map<String, ColStats<?>> colStatsMap = new HashMap<>();
         colStatsMap.put("pt", ColStats.newColStats(0, 1L, 2, 2, 0L, null, 
null));
@@ -311,7 +311,7 @@ public abstract class FileStoreTableStatisticsTestBase {
                         null,
                         null,
                         null,
-                        false);
+                        null);
         
Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(4L);
         Map<String, ColStats<?>> colStatsMap = new HashMap<>();
         colStatsMap.put("pt", ColStats.newColStats(0, 4L, 2, 2, 0L, null, 
null));
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
index f5d4121672..ea47df2d9d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
@@ -52,7 +52,7 @@ public class PrimaryKeyTableStatisticsTest extends 
FileStoreTableStatisticsTestB
                         null,
                         null,
                         null,
-                        false);
+                        null);
         
Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(9L);
         // TODO validate column statistics
     }

Reply via email to