This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2e2a214985 [core] Drop stats for deleted data files to reduce memory
(#4629)
2e2a214985 is described below
commit 2e2a214985f4eef3bda4c7505ec53be1ec278974
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Dec 4 16:20:01 2024 +0800
[core] Drop stats for deleted data files to reduce memory (#4629)
---
.../UnawareAppendTableCompactionCoordinator.java | 2 ++
.../paimon/operation/AbstractFileStoreWrite.java | 3 ++-
.../org/apache/paimon/stats/SimpleStatsEvolution.java | 18 +++++++++++++++---
.../paimon/flink/lookup/LookupDataTableScan.java | 1 +
.../flink/lookup/PrimaryKeyPartialLookupTable.java | 1 +
.../apache/paimon/flink/service/QueryFileMonitor.java | 2 +-
.../apache/paimon/spark/commands/PaimonCommand.scala | 3 ++-
7 files changed, 24 insertions(+), 6 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
index 577f28d0f5..5e43568aac 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
@@ -380,6 +380,8 @@ public class UnawareAppendTableCompactionCoordinator {
if (filter != null) {
snapshotReader.withFilter(filter);
}
+ // drop stats to reduce memory
+ snapshotReader.dropStats();
this.streamingMode = isStreaming;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index d638870300..43957de8d6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -101,7 +101,8 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
int writerNumberMax,
boolean legacyPartitionName) {
this.snapshotManager = snapshotManager;
- this.scan = scan;
+ // Statistic is useless in writer
+ this.scan = scan == null ? null : scan.dropStats();
this.indexFactory = indexFactory;
this.dvMaintainerFactory = dvMaintainerFactory;
this.totalBuckets = totalBuckets;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java
b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java
index d3f6d4cd62..079300a89d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java
+++
b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java
@@ -22,6 +22,8 @@ import org.apache.paimon.casting.CastFieldGetter;
import org.apache.paimon.casting.CastedRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
@@ -33,9 +35,9 @@ import org.apache.paimon.utils.ProjectedRow;
import javax.annotation.Nullable;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/** Converter for array of {@link SimpleColStats}. */
public class SimpleStatsEvolution {
@@ -46,6 +48,9 @@ public class SimpleStatsEvolution {
private final Map<List<String>, int[]> indexMappings;
+ private final GenericRow emptyValues;
+ private final GenericArray emptyNullCounts;
+
public SimpleStatsEvolution(
RowType rowType,
@Nullable int[] indexMapping,
@@ -53,7 +58,9 @@ public class SimpleStatsEvolution {
this.fieldNames = rowType.getFieldNames();
this.indexMapping = indexMapping;
this.castFieldGetters = castFieldGetters;
- this.indexMappings = new HashMap<>();
+ this.indexMappings = new ConcurrentHashMap<>();
+ this.emptyValues = new GenericRow(fieldNames.size());
+ this.emptyNullCounts = new GenericArray(new Object[fieldNames.size()]);
}
public Result evolution(
@@ -62,7 +69,12 @@ public class SimpleStatsEvolution {
InternalRow maxValues = stats.maxValues();
InternalArray nullCounts = stats.nullCounts();
- if (denseFields != null) {
+ if (denseFields != null && denseFields.isEmpty()) {
+ // optimize for empty dense fields
+ minValues = emptyValues;
+ maxValues = emptyValues;
+ nullCounts = emptyNullCounts;
+ } else if (denseFields != null) {
int[] denseIndexMapping =
indexMappings.computeIfAbsent(
denseFields,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
index 48cb64e70b..f43d80321e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
@@ -59,6 +59,7 @@ public class LookupDataTableScan extends DataTableStreamScan {
defaultValueAssigner);
this.startupMode = options.startupMode();
this.lookupScanMode = lookupScanMode;
+ dropStats();
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
index ef5543ac9b..7bd7a652b5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
@@ -207,6 +207,7 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
this.scan =
table.newReadBuilder()
+ .dropStats()
.withFilter(filter)
.withBucketFilter(
requireCachedBucketIds == null
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
index 02f8a65411..b9776786fa 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
@@ -83,7 +83,7 @@ public class QueryFileMonitor extends
RichSourceFunction<InternalRow> {
*/
public void open(Configuration parameters) throws Exception {
FileMonitorTable monitorTable = new FileMonitorTable((FileStoreTable)
table);
- ReadBuilder readBuilder = monitorTable.newReadBuilder();
+ ReadBuilder readBuilder = monitorTable.newReadBuilder().dropStats();
this.scan = readBuilder.newStreamScan();
this.read = readBuilder.newRead();
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 191d7a766b..466643b157 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -94,7 +94,8 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper with SQLCon
condition: Expression,
output: Seq[Attribute]): Seq[DataSplit] = {
// low level snapshot reader, it can not be affected by 'scan.mode'
- val snapshotReader = table.newSnapshotReader()
+ // dropStats after filter push down
+ val snapshotReader = table.newSnapshotReader().dropStats()
if (condition != TrueLiteral) {
val filter =
convertConditionToPaimonPredicate(condition, output, rowType,
ignoreFailure = true)