This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e2a214985 [core] Drop stats for deleted data files to reduce memory 
(#4629)
2e2a214985 is described below

commit 2e2a214985f4eef3bda4c7505ec53be1ec278974
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Dec 4 16:20:01 2024 +0800

    [core] Drop stats for deleted data files to reduce memory (#4629)
---
 .../UnawareAppendTableCompactionCoordinator.java       |  2 ++
 .../paimon/operation/AbstractFileStoreWrite.java       |  3 ++-
 .../org/apache/paimon/stats/SimpleStatsEvolution.java  | 18 +++++++++++++++---
 .../paimon/flink/lookup/LookupDataTableScan.java       |  1 +
 .../flink/lookup/PrimaryKeyPartialLookupTable.java     |  1 +
 .../apache/paimon/flink/service/QueryFileMonitor.java  |  2 +-
 .../apache/paimon/spark/commands/PaimonCommand.scala   |  3 ++-
 7 files changed, 24 insertions(+), 6 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
index 577f28d0f5..5e43568aac 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
@@ -380,6 +380,8 @@ public class UnawareAppendTableCompactionCoordinator {
             if (filter != null) {
                 snapshotReader.withFilter(filter);
             }
+            // drop stats to reduce memory
+            snapshotReader.dropStats();
             this.streamingMode = isStreaming;
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index d638870300..43957de8d6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -101,7 +101,8 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
             int writerNumberMax,
             boolean legacyPartitionName) {
         this.snapshotManager = snapshotManager;
-        this.scan = scan;
+        // Statistic is useless in writer
+        this.scan = scan == null ? null : scan.dropStats();
         this.indexFactory = indexFactory;
         this.dvMaintainerFactory = dvMaintainerFactory;
         this.totalBuckets = totalBuckets;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java 
b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java
index d3f6d4cd62..079300a89d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java
@@ -22,6 +22,8 @@ import org.apache.paimon.casting.CastFieldGetter;
 import org.apache.paimon.casting.CastedRow;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
@@ -33,9 +35,9 @@ import org.apache.paimon.utils.ProjectedRow;
 
 import javax.annotation.Nullable;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /** Converter for array of {@link SimpleColStats}. */
 public class SimpleStatsEvolution {
@@ -46,6 +48,9 @@ public class SimpleStatsEvolution {
 
     private final Map<List<String>, int[]> indexMappings;
 
+    private final GenericRow emptyValues;
+    private final GenericArray emptyNullCounts;
+
     public SimpleStatsEvolution(
             RowType rowType,
             @Nullable int[] indexMapping,
@@ -53,7 +58,9 @@ public class SimpleStatsEvolution {
         this.fieldNames = rowType.getFieldNames();
         this.indexMapping = indexMapping;
         this.castFieldGetters = castFieldGetters;
-        this.indexMappings = new HashMap<>();
+        this.indexMappings = new ConcurrentHashMap<>();
+        this.emptyValues = new GenericRow(fieldNames.size());
+        this.emptyNullCounts = new GenericArray(new Object[fieldNames.size()]);
     }
 
     public Result evolution(
@@ -62,7 +69,12 @@ public class SimpleStatsEvolution {
         InternalRow maxValues = stats.maxValues();
         InternalArray nullCounts = stats.nullCounts();
 
-        if (denseFields != null) {
+        if (denseFields != null && denseFields.isEmpty()) {
+            // optimize for empty dense fields
+            minValues = emptyValues;
+            maxValues = emptyValues;
+            nullCounts = emptyNullCounts;
+        } else if (denseFields != null) {
             int[] denseIndexMapping =
                     indexMappings.computeIfAbsent(
                             denseFields,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
index 48cb64e70b..f43d80321e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
@@ -59,6 +59,7 @@ public class LookupDataTableScan extends DataTableStreamScan {
                 defaultValueAssigner);
         this.startupMode = options.startupMode();
         this.lookupScanMode = lookupScanMode;
+        dropStats();
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
index ef5543ac9b..7bd7a652b5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
@@ -207,6 +207,7 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
 
             this.scan =
                     table.newReadBuilder()
+                            .dropStats()
                             .withFilter(filter)
                             .withBucketFilter(
                                     requireCachedBucketIds == null
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
index 02f8a65411..b9776786fa 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
@@ -83,7 +83,7 @@ public class QueryFileMonitor extends 
RichSourceFunction<InternalRow> {
      */
     public void open(Configuration parameters) throws Exception {
         FileMonitorTable monitorTable = new FileMonitorTable((FileStoreTable) 
table);
-        ReadBuilder readBuilder = monitorTable.newReadBuilder();
+        ReadBuilder readBuilder = monitorTable.newReadBuilder().dropStats();
         this.scan = readBuilder.newStreamScan();
         this.read = readBuilder.newRead();
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 191d7a766b..466643b157 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -94,7 +94,8 @@ trait PaimonCommand extends WithFileStoreTable with 
ExpressionHelper with SQLCon
       condition: Expression,
       output: Seq[Attribute]): Seq[DataSplit] = {
     // low level snapshot reader, it can not be affected by 'scan.mode'
-    val snapshotReader = table.newSnapshotReader()
+    // dropStats after filter push down
+    val snapshotReader = table.newSnapshotReader().dropStats()
     if (condition != TrueLiteral) {
       val filter =
         convertConditionToPaimonPredicate(condition, output, rowType, 
ignoreFailure = true)

Reply via email to