This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b4ccb58c25 [flink] Enable limit pushdown and count optimization for dv
table (#4712)
b4ccb58c25 is described below
commit b4ccb58c2551d4f9c89728f0bca619cfa16c0edb
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Dec 16 14:13:12 2024 +0800
[flink] Enable limit pushdown and count optimization for dv table (#4712)
---
.../paimon/flink/source/DataTableSource.java | 8 ++--
.../paimon/flink/source/BaseDataTableSource.java | 45 ++++++++++++----------
.../paimon/flink/source/DataTableSource.java | 8 ++--
.../apache/paimon/flink/BatchFileStoreITCase.java | 24 ++++++++++--
.../FileStoreTableStatisticsTestBase.java | 6 +--
.../statistics/PrimaryKeyTableStatisticsTest.java | 2 +-
6 files changed, 57 insertions(+), 36 deletions(-)
diff --git
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index ee00d41832..f41f8da6c8 100644
---
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -51,7 +51,7 @@ public class DataTableSource extends BaseDataTableSource {
null,
null,
null,
- false);
+ null);
}
public DataTableSource(
@@ -64,7 +64,7 @@ public class DataTableSource extends BaseDataTableSource {
@Nullable int[][] projectFields,
@Nullable Long limit,
@Nullable WatermarkStrategy<RowData> watermarkStrategy,
- boolean isBatchCountStar) {
+ @Nullable Long countPushed) {
super(
tableIdentifier,
table,
@@ -75,7 +75,7 @@ public class DataTableSource extends BaseDataTableSource {
projectFields,
limit,
watermarkStrategy,
- isBatchCountStar);
+ countPushed);
}
@Override
@@ -90,7 +90,7 @@ public class DataTableSource extends BaseDataTableSource {
projectFields,
limit,
watermarkStrategy,
- isBatchCountStar);
+ countPushed);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
index 5dbbdcedd8..a94d799773 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
@@ -28,7 +28,6 @@ import org.apache.paimon.flink.log.LogSourceProvider;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.flink.lookup.FileStoreLookupFunction;
import org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory;
-import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
@@ -36,7 +35,8 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.Projection;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -74,6 +74,7 @@ import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGN
import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
/**
* Table source to create {@link StaticFileStoreSource} or {@link
ContinuousFileStoreSource} under
@@ -98,7 +99,7 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
@Nullable protected final LogStoreTableFactory logStoreTableFactory;
@Nullable protected WatermarkStrategy<RowData> watermarkStrategy;
- protected boolean isBatchCountStar;
+ @Nullable protected Long countPushed;
public BaseDataTableSource(
ObjectIdentifier tableIdentifier,
@@ -110,7 +111,7 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
@Nullable int[][] projectFields,
@Nullable Long limit,
@Nullable WatermarkStrategy<RowData> watermarkStrategy,
- boolean isBatchCountStar) {
+ @Nullable Long countPushed) {
super(table, predicate, projectFields, limit);
this.tableIdentifier = tableIdentifier;
this.streaming = streaming;
@@ -120,7 +121,7 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
this.projectFields = projectFields;
this.limit = limit;
this.watermarkStrategy = watermarkStrategy;
- this.isBatchCountStar = isBatchCountStar;
+ this.countPushed = countPushed;
}
@Override
@@ -159,7 +160,7 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext)
{
- if (isBatchCountStar) {
+ if (countPushed != null) {
return createCountStarScan();
}
@@ -212,10 +213,8 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
}
private ScanRuntimeProvider createCountStarScan() {
- TableScan scan =
table.newReadBuilder().withFilter(predicate).newScan();
- List<PartitionEntry> partitionEntries = scan.listPartitionEntries();
- long rowCount =
partitionEntries.stream().mapToLong(PartitionEntry::recordCount).sum();
- NumberSequenceRowSource source = new NumberSequenceRowSource(rowCount,
rowCount);
+ checkNotNull(countPushed);
+ NumberSequenceRowSource source = new
NumberSequenceRowSource(countPushed, countPushed);
return new SourceProvider() {
@Override
public Source<RowData, ?, ?> createSource() {
@@ -303,15 +302,6 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
return false;
}
- if (!table.primaryKeys().isEmpty()) {
- return false;
- }
-
- CoreOptions options = ((DataTable) table).coreOptions();
- if (options.deletionVectorsEnabled()) {
- return false;
- }
-
if (groupingSets.size() != 1) {
return false;
}
@@ -334,7 +324,22 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
return false;
}
- isBatchCountStar = true;
+ List<Split> splits =
+
table.newReadBuilder().dropStats().withFilter(predicate).newScan().plan().splits();
+ long countPushed = 0;
+ for (Split s : splits) {
+ if (!(s instanceof DataSplit)) {
+ return false;
+ }
+ DataSplit split = (DataSplit) s;
+ if (!split.mergedRowCountAvailable()) {
+ return false;
+ }
+
+ countPushed += split.mergedRowCount();
+ }
+
+ this.countPushed = countPushed;
return true;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index 53a1b5f630..2b470cb438 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -70,7 +70,7 @@ public class DataTableSource extends BaseDataTableSource
null,
null,
null,
- false);
+ null);
}
public DataTableSource(
@@ -84,7 +84,7 @@ public class DataTableSource extends BaseDataTableSource
@Nullable Long limit,
@Nullable WatermarkStrategy<RowData> watermarkStrategy,
@Nullable List<String> dynamicPartitionFilteringFields,
- boolean isBatchCountStar) {
+ @Nullable Long countPushed) {
super(
tableIdentifier,
table,
@@ -95,7 +95,7 @@ public class DataTableSource extends BaseDataTableSource
projectFields,
limit,
watermarkStrategy,
- isBatchCountStar);
+ countPushed);
this.dynamicPartitionFilteringFields = dynamicPartitionFilteringFields;
}
@@ -112,7 +112,7 @@ public class DataTableSource extends BaseDataTableSource
limit,
watermarkStrategy,
dynamicPartitionFilteringFields,
- isBatchCountStar);
+ countPushed);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index cdc114b048..d48b6e7712 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -561,19 +561,35 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
String sql = "SELECT COUNT(*) FROM count_append_dv";
assertThat(sql(sql)).containsOnly(Row.of(2L));
- validateCount1NotPushDown(sql);
+ validateCount1PushDown(sql);
}
@Test
public void testCountStarPK() {
- sql("CREATE TABLE count_pk (f0 INT PRIMARY KEY NOT ENFORCED, f1
STRING)");
- sql("INSERT INTO count_pk VALUES (1, 'a'), (2, 'b')");
+ sql(
+ "CREATE TABLE count_pk (f0 INT PRIMARY KEY NOT ENFORCED, f1
STRING) WITH ('file.format' = 'avro')");
+ sql("INSERT INTO count_pk VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4,
'd')");
+ sql("INSERT INTO count_pk VALUES (1, 'e')");
String sql = "SELECT COUNT(*) FROM count_pk";
- assertThat(sql(sql)).containsOnly(Row.of(2L));
+ assertThat(sql(sql)).containsOnly(Row.of(4L));
validateCount1NotPushDown(sql);
}
+ @Test
+ public void testCountStarPKDv() {
+ sql(
+ "CREATE TABLE count_pk_dv (f0 INT PRIMARY KEY NOT ENFORCED, f1
STRING) WITH ("
+ + "'file.format' = 'avro', "
+ + "'deletion-vectors.enabled' = 'true')");
+ sql("INSERT INTO count_pk_dv VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4,
'd')");
+ sql("INSERT INTO count_pk_dv VALUES (1, 'e')");
+
+ String sql = "SELECT COUNT(*) FROM count_pk_dv";
+ assertThat(sql(sql)).containsOnly(Row.of(4L));
+ validateCount1PushDown(sql);
+ }
+
@Test
public void testParquetRowDecimalAndTimestamp() {
sql(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
index 826bf28d12..42a47ea1e2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
@@ -153,7 +153,7 @@ public abstract class FileStoreTableStatisticsTestBase {
null,
null,
null,
- false);
+ null);
Assertions.assertThat(partitionFilterSource.reportStatistics().getRowCount()).isEqualTo(5L);
Map<String, ColStats<?>> colStatsMap = new HashMap<>();
colStatsMap.put("pt", ColStats.newColStats(0, 1L, 1, 2, 0L, null,
null));
@@ -232,7 +232,7 @@ public abstract class FileStoreTableStatisticsTestBase {
null,
null,
null,
- false);
+ null);
Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(2L);
Map<String, ColStats<?>> colStatsMap = new HashMap<>();
colStatsMap.put("pt", ColStats.newColStats(0, 1L, 2, 2, 0L, null,
null));
@@ -311,7 +311,7 @@ public abstract class FileStoreTableStatisticsTestBase {
null,
null,
null,
- false);
+ null);
Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(4L);
Map<String, ColStats<?>> colStatsMap = new HashMap<>();
colStatsMap.put("pt", ColStats.newColStats(0, 4L, 2, 2, 0L, null,
null));
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
index f5d4121672..ea47df2d9d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
@@ -52,7 +52,7 @@ public class PrimaryKeyTableStatisticsTest extends
FileStoreTableStatisticsTestB
null,
null,
null,
- false);
+ null);
Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(9L);
// TODO validate column statistics
}