This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d28d8f17 [core] Fix Flink batch reads the paimon partition table 
jobManager oom (#3245)
3d28d8f17 is described below

commit 3d28d8f17a401bf2829a8a6dafb7118b217a8755
Author: BeiWeiDeDengDai <[email protected]>
AuthorDate: Mon Apr 22 18:54:46 2024 +0800

    [core] Fix Flink batch reads the paimon partition table jobManager oom 
(#3245)
---
 .../table/source/snapshot/SnapshotReaderImpl.java      | 17 ++++-------------
 .../java/org/apache/paimon/flink/FlinkCatalog.java     | 18 ++----------------
 2 files changed, 6 insertions(+), 29 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index cb3c9fc7e..b21010469 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -29,7 +29,7 @@ import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileKind;
-import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.SimpleFileEntry;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.operation.FileStoreScan;
@@ -309,18 +309,9 @@ public class SnapshotReaderImpl implements SnapshotReader {
 
     @Override
     public List<BinaryRow> partitions() {
-        List<ManifestEntry> entryList = scan.plan().files();
-
-        return entryList.stream()
-                .collect(
-                        Collectors.groupingBy(
-                                ManifestEntry::partition,
-                                LinkedHashMap::new,
-                                Collectors.reducing((a, b) -> b)))
-                .values()
-                .stream()
-                .map(Optional::get)
-                .map(ManifestEntry::partition)
+        return scan.readSimpleEntries().stream()
+                .map(SimpleFileEntry::partition)
+                .distinct()
                 .collect(Collectors.toList());
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 98ca840f0..71fbf0fa6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -30,9 +30,7 @@ import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ReadBuilder;
-import org.apache.paimon.table.source.Split;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.RowDataPartitionComputer;
@@ -841,22 +839,10 @@ public class FlinkCatalog extends AbstractCatalog {
             }
 
             ReadBuilder readBuilder = table.newReadBuilder();
-            List<Split> splits;
             if (partitionSpec != null && partitionSpec.getPartitionSpec() != 
null) {
-                splits =
-                        readBuilder
-                                
.withPartitionFilter(partitionSpec.getPartitionSpec())
-                                .newScan()
-                                .plan()
-                                .splits();
-            } else {
-                splits = readBuilder.newScan().plan().splits();
+                
readBuilder.withPartitionFilter(partitionSpec.getPartitionSpec());
             }
-
-            List<BinaryRow> partitions =
-                    splits.stream()
-                            .map(m -> ((DataSplit) m).partition())
-                            .collect(Collectors.toList());
+            List<BinaryRow> partitions = 
readBuilder.newScan().listPartitions();
             org.apache.paimon.types.RowType partitionRowType =
                     fileStoreTable.schema().logicalPartitionType();
 

Reply via email to