This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new daeb886448 [flink] dropStats in dedicated compactor jobs (#5109)
daeb886448 is described below

commit daeb886448b37e8d2d462b9659fb8676be98b107
Author: wangwj <[email protected]>
AuthorDate: Mon Feb 24 20:45:22 2025 +0800

    [flink] dropStats in dedicated compactor jobs (#5109)
---
 .../paimon/flink/source/CompactorSourceBuilder.java   | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index 141f45ecd1..2753ea7779 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -86,26 +86,31 @@ public class CompactorSourceBuilder {
     }
 
     private Source<RowData, ?, ?> buildSource(CompactBucketsTable 
compactBucketsTable) {
-
         if (isContinuous) {
             compactBucketsTable = 
compactBucketsTable.copy(streamingCompactOptions());
             return new ContinuousFileStoreSource(
-                    
compactBucketsTable.newReadBuilder().withFilter(partitionPredicate),
-                    compactBucketsTable.options(),
-                    null);
+                    getReadBuilder(compactBucketsTable), 
compactBucketsTable.options(), null);
         } else {
             compactBucketsTable = 
compactBucketsTable.copy(batchCompactOptions());
-            ReadBuilder readBuilder =
-                    
compactBucketsTable.newReadBuilder().withFilter(partitionPredicate);
             Options options = 
compactBucketsTable.coreOptions().toConfiguration();
+
             return new StaticFileStoreSource(
-                    readBuilder,
+                    getReadBuilder(compactBucketsTable),
                     null,
                     
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
                     
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE));
         }
     }
 
+    private ReadBuilder getReadBuilder(CompactBucketsTable 
compactBucketsTable) {
+        ReadBuilder readBuilder =
+                
compactBucketsTable.newReadBuilder().withFilter(partitionPredicate);
+        if 
(CoreOptions.fromMap(table.options()).manifestDeleteFileDropStats()) {
+            readBuilder.dropStats();
+        }
+        return readBuilder;
+    }
+
     public DataStreamSource<RowData> build() {
         if (env == null) {
             throw new IllegalArgumentException("StreamExecutionEnvironment 
should not be null.");

Reply via email to