This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new daeb886448 [flink] dropStats in dedicated compactor jobs (#5109)
daeb886448 is described below
commit daeb886448b37e8d2d462b9659fb8676be98b107
Author: wangwj <[email protected]>
AuthorDate: Mon Feb 24 20:45:22 2025 +0800
[flink] dropStats in dedicated compactor jobs (#5109)
---
.../paimon/flink/source/CompactorSourceBuilder.java | 19 ++++++++++++-------
1 file changed, 12 insertions(+), 7 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index 141f45ecd1..2753ea7779 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -86,26 +86,31 @@ public class CompactorSourceBuilder {
}
private Source<RowData, ?, ?> buildSource(CompactBucketsTable
compactBucketsTable) {
-
if (isContinuous) {
compactBucketsTable =
compactBucketsTable.copy(streamingCompactOptions());
return new ContinuousFileStoreSource(
-
compactBucketsTable.newReadBuilder().withFilter(partitionPredicate),
- compactBucketsTable.options(),
- null);
+ getReadBuilder(compactBucketsTable),
compactBucketsTable.options(), null);
} else {
compactBucketsTable =
compactBucketsTable.copy(batchCompactOptions());
- ReadBuilder readBuilder =
-
compactBucketsTable.newReadBuilder().withFilter(partitionPredicate);
Options options =
compactBucketsTable.coreOptions().toConfiguration();
+
return new StaticFileStoreSource(
- readBuilder,
+ getReadBuilder(compactBucketsTable),
null,
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE));
}
}
+ private ReadBuilder getReadBuilder(CompactBucketsTable
compactBucketsTable) {
+ ReadBuilder readBuilder =
+
compactBucketsTable.newReadBuilder().withFilter(partitionPredicate);
+ if
(CoreOptions.fromMap(table.options()).manifestDeleteFileDropStats()) {
+ readBuilder.dropStats();
+ }
+ return readBuilder;
+ }
+
public DataStreamSource<RowData> build() {
if (env == null) {
throw new IllegalArgumentException("StreamExecutionEnvironment
should not be null.");