This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d23c7da2 [core] limit parallelly read file memory usage, extract some 
methods (#1072)
5d23c7da2 is described below

commit 5d23c7da2ae4973c7d70f9e89a4b83220264142c
Author: YeJunHao <[email protected]>
AuthorDate: Mon May 8 14:02:28 2023 +0800

    [core] limit parallelly read file memory usage, extract some methods (#1072)
---
 .../shortcodes/generated/core_configuration.html   |  18 ++-
 .../org/apache/paimon/AppendOnlyFileStore.java     |   3 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  13 ++
 .../java/org/apache/paimon/KeyValueFileStore.java  |   3 +-
 ...nifestEntry.java => AbstractManifestEntry.java} |  93 +++++-------
 .../org/apache/paimon/manifest/ManifestEntry.java  | 122 +---------------
 .../paimon/operation/AbstractFileStoreScan.java    | 100 +++++++------
 .../paimon/operation/AppendOnlyFileStoreScan.java  |   6 +-
 .../paimon/operation/KeyValueFileStoreScan.java    |   6 +-
 .../paimon/utils/ParallellyExecuteUtils.java       |  86 +++++++++++
 .../paimon/utils/ParallellyExecuteUtilsTest.java   | 162 +++++++++++++++++++++
 11 files changed, 382 insertions(+), 230 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index edf14f17f..25c057926 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -194,12 +194,6 @@
             <td><p>Enum</p></td>
             <td>Specify the merge engine for table with primary key.<br /><br 
/>Possible values:<ul><li>"deduplicate": De-duplicate and keep the last 
row.</li><li>"partial-update": Partial update non-null 
fields.</li><li>"aggregation": Aggregate fields with same primary 
key.</li></ul></td>
         </tr>
-        <tr>
-            <td><h5>sort-engine</h5></td>
-            <td style="word-wrap: break-word;">loser-tree</td>
-            <td><p>Enum</p></td>
-            <td>Specify the sort engine for table with primary key.<br /><br 
/>Possible values:<ul><li>"min-heap": Use min-heap for multiway 
sorting.</li><li>"loser-tree": Use loser-tree for multiway sorting. Compared 
with heapsort, loser-tree has fewer comparisons and is more 
efficient.</li></ul></td>
-        </tr>
         <tr>
             <td><h5>num-levels</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -296,6 +290,12 @@
             <td>Long</td>
             <td>End condition "watermark" for bounded streaming mode. Stream 
reading will end when a larger watermark snapshot is encountered.</td>
         </tr>
+        <tr>
+            <td><h5>scan.manifest.parallelism</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Integer</td>
+            <td>The parallelism of scanning manifest files, default value is 
the size of cpu processor.Note: Scale-up this parameter will increase memory 
usage while scanning manifest files.We can consider downsize it when we 
encounter an out of memory exception while scanning</td>
+        </tr>
         <tr>
             <td><h5>scan.mode</h5></td>
             <td style="word-wrap: break-word;">default</td>
@@ -344,6 +344,12 @@
             <td>Duration</td>
             <td>The maximum time of completed snapshots to retain.</td>
         </tr>
+        <tr>
+            <td><h5>sort-engine</h5></td>
+            <td style="word-wrap: break-word;">loser-tree</td>
+            <td><p>Enum</p></td>
+            <td>Specify the sort engine for table with primary key.<br /><br 
/>Possible values:<ul><li>"min-heap": Use min-heap for multiway 
sorting.</li><li>"loser-tree": Use loser-tree for multiway sorting. Compared 
with heapsort, loser-tree has fewer comparisons and is more 
efficient.</li></ul></td>
+        </tr>
         <tr>
             <td><h5>source.split.open-file-cost</h5></td>
             <td style="word-wrap: break-word;">4 mb</td>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 8e28b5b2d..41abd3cae 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -96,7 +96,8 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                 manifestFileFactory(forWrite),
                 manifestListFactory(forWrite),
                 options.bucket(),
-                forWrite);
+                forWrite,
+                options.scanManifestParallelism());
     }
 
     @Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 0405d021f..d5cb05528 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -387,6 +387,15 @@ public class CoreOptions implements Serializable {
                             "End condition \"watermark\" for bounded streaming 
mode. Stream"
                                     + " reading will end when a larger 
watermark snapshot is encountered.");
 
+    public static final ConfigOption<Integer> SCAN_MANIFEST_PARALLELISM =
+            key("scan.manifest.parallelism")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The parallelism of scanning manifest files, 
default value is the size of cpu processor."
+                                    + "Note: Scale-up this parameter will 
increase memory usage while scanning manifest files."
+                                    + "We can consider downsize it when we 
encounter an out of memory exception while scanning");
+
     public static final ConfigOption<LogConsistency> LOG_CONSISTENCY =
             key("log.consistency")
                     .enumType(LogConsistency.class)
@@ -844,6 +853,10 @@ public class CoreOptions implements Serializable {
         return options.get(SCAN_SNAPSHOT_ID);
     }
 
+    public Integer scanManifestParallelism() {
+        return options.get(SCAN_MANIFEST_PARALLELISM);
+    }
+
     public Optional<String> sequenceField() {
         return options.getOptional(SEQUENCE_FIELD);
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 77fbd5d76..e1e7819b5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -121,7 +121,8 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 manifestFileFactory(forWrite),
                 manifestListFactory(forWrite),
                 options.bucket(),
-                forWrite);
+                forWrite,
+                options.scanManifestParallelism());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/AbstractManifestEntry.java
similarity index 65%
copy from 
paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
copy to 
paimon-core/src/main/java/org/apache/paimon/manifest/AbstractManifestEntry.java
index 84af10819..cdd54cc0a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/AbstractManifestEntry.java
@@ -19,40 +19,37 @@
 package org.apache.paimon.manifest;
 
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.TinyIntType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Preconditions;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-import static org.apache.paimon.utils.SerializationUtils.newBytesType;
-
-/** Entry of a manifest file, representing an addition / deletion of a data 
file. */
-public class ManifestEntry {
-
-    private final FileKind kind;
+/** Abstract a simplest model of manifest file. */
+public abstract class AbstractManifestEntry {
+    protected final FileKind kind;
+    protected final String fileName;
     // for tables without partition this field should be a row with 0 columns 
(not null)
-    private final BinaryRow partition;
-    private final int bucket;
-    private final int totalBuckets;
-    private final DataFileMeta file;
-
-    public ManifestEntry(
-            FileKind kind, BinaryRow partition, int bucket, int totalBuckets, 
DataFileMeta file) {
+    protected final BinaryRow partition;
+    protected final int bucket;
+    protected final int totalBuckets;
+    protected final int level;
+
+    public AbstractManifestEntry(
+            FileKind kind,
+            String fileName,
+            BinaryRow partition,
+            int bucket,
+            int totalBuckets,
+            int level) {
         this.kind = kind;
+        this.fileName = fileName;
         this.partition = partition;
         this.bucket = bucket;
         this.totalBuckets = totalBuckets;
-        this.file = file;
+        this.level = level;
     }
 
     public FileKind kind() {
@@ -71,57 +68,48 @@ public class ManifestEntry {
         return totalBuckets;
     }
 
-    public DataFileMeta file() {
-        return file;
+    public int level() {
+        return level;
     }
 
     public Identifier identifier() {
-        return new Identifier(partition, bucket, file.level(), 
file.fileName());
-    }
-
-    public static RowType schema() {
-        List<DataField> fields = new ArrayList<>();
-        fields.add(new DataField(0, "_KIND", new TinyIntType(false)));
-        fields.add(new DataField(1, "_PARTITION", newBytesType(false)));
-        fields.add(new DataField(2, "_BUCKET", new IntType(false)));
-        fields.add(new DataField(3, "_TOTAL_BUCKETS", new IntType(false)));
-        fields.add(new DataField(4, "_FILE", DataFileMeta.schema()));
-        return new RowType(fields);
+        return new Identifier(partition, bucket, level, fileName);
     }
 
     @Override
     public boolean equals(Object o) {
-        if (!(o instanceof ManifestEntry)) {
+        if (!(o instanceof AbstractManifestEntry)) {
             return false;
         }
-        ManifestEntry that = (ManifestEntry) o;
+        AbstractManifestEntry that = (AbstractManifestEntry) o;
         return Objects.equals(kind, that.kind)
                 && Objects.equals(partition, that.partition)
                 && bucket == that.bucket
-                && totalBuckets == that.totalBuckets
-                && Objects.equals(file, that.file);
+                && level == that.level
+                && Objects.equals(fileName, that.fileName);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(kind, partition, bucket, totalBuckets, file);
+        return Objects.hash(kind, partition, bucket, level, fileName);
     }
 
     @Override
     public String toString() {
-        return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket, 
totalBuckets, file);
+        return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket, 
level, fileName);
     }
 
-    public static Collection<ManifestEntry> mergeEntries(List<ManifestEntry> 
entries) {
-        LinkedHashMap<Identifier, ManifestEntry> map = new LinkedHashMap<>();
+    public static <T extends AbstractManifestEntry> Collection<T> mergeEntries(
+            Iterable<T> entries) {
+        LinkedHashMap<Identifier, T> map = new LinkedHashMap<>();
         mergeEntries(entries, map);
         return map.values();
     }
 
-    public static void mergeEntries(
-            List<ManifestEntry> entries, Map<Identifier, ManifestEntry> map) {
-        for (ManifestEntry entry : entries) {
-            ManifestEntry.Identifier identifier = entry.identifier();
+    public static <T extends AbstractManifestEntry> void mergeEntries(
+            Iterable<T> entries, Map<Identifier, T> map) {
+        for (T entry : entries) {
+            Identifier identifier = entry.identifier();
             switch (entry.kind()) {
                 case ADD:
                     Preconditions.checkState(
@@ -149,18 +137,9 @@ public class ManifestEntry {
         }
     }
 
-    public static void assertNoDelete(Collection<ManifestEntry> entries) {
-        for (ManifestEntry entry : entries) {
-            Preconditions.checkState(
-                    entry.kind() != FileKind.DELETE,
-                    "Trying to delete file %s which is not previously added. 
Manifest might be corrupted.",
-                    entry.file().fileName());
-        }
-    }
-
     /**
-     * The same {@link Identifier} indicates that the {@link ManifestEntry} 
refers to the same data
-     * file.
+     * The same {@link Identifier} indicates that the {@link 
AbstractManifestEntry} refers to the
+     * same data file.
      */
     public static class Identifier {
         public final BinaryRow partition;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index 84af10819..5ad9f9972 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -24,61 +24,30 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TinyIntType;
-import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Preconditions;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 
 import static org.apache.paimon.utils.SerializationUtils.newBytesType;
 
 /** Entry of a manifest file, representing an addition / deletion of a data 
file. */
-public class ManifestEntry {
+public class ManifestEntry extends AbstractManifestEntry {
 
-    private final FileKind kind;
-    // for tables without partition this field should be a row with 0 columns 
(not null)
-    private final BinaryRow partition;
-    private final int bucket;
-    private final int totalBuckets;
     private final DataFileMeta file;
 
     public ManifestEntry(
             FileKind kind, BinaryRow partition, int bucket, int totalBuckets, 
DataFileMeta file) {
-        this.kind = kind;
-        this.partition = partition;
-        this.bucket = bucket;
-        this.totalBuckets = totalBuckets;
+        super(kind, file.fileName(), partition, bucket, totalBuckets, 
file.level());
         this.file = file;
     }
 
-    public FileKind kind() {
-        return kind;
-    }
-
-    public BinaryRow partition() {
-        return partition;
-    }
-
-    public int bucket() {
-        return bucket;
-    }
-
-    public int totalBuckets() {
-        return totalBuckets;
-    }
-
     public DataFileMeta file() {
         return file;
     }
 
-    public Identifier identifier() {
-        return new Identifier(partition, bucket, file.level(), 
file.fileName());
-    }
-
     public static RowType schema() {
         List<DataField> fields = new ArrayList<>();
         fields.add(new DataField(0, "_KIND", new TinyIntType(false)));
@@ -112,43 +81,6 @@ public class ManifestEntry {
         return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket, 
totalBuckets, file);
     }
 
-    public static Collection<ManifestEntry> mergeEntries(List<ManifestEntry> 
entries) {
-        LinkedHashMap<Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        mergeEntries(entries, map);
-        return map.values();
-    }
-
-    public static void mergeEntries(
-            List<ManifestEntry> entries, Map<Identifier, ManifestEntry> map) {
-        for (ManifestEntry entry : entries) {
-            ManifestEntry.Identifier identifier = entry.identifier();
-            switch (entry.kind()) {
-                case ADD:
-                    Preconditions.checkState(
-                            !map.containsKey(identifier),
-                            "Trying to add file %s which is already added. 
Manifest might be corrupted.",
-                            identifier);
-                    map.put(identifier, entry);
-                    break;
-                case DELETE:
-                    // each dataFile will only be added once and deleted once,
-                    // if we know that it is added before then both add and 
delete entry can be
-                    // removed because there won't be further operations on 
this file,
-                    // otherwise we have to keep the delete entry because the 
add entry must be
-                    // in the previous manifest files
-                    if (map.containsKey(identifier)) {
-                        map.remove(identifier);
-                    } else {
-                        map.put(identifier, entry);
-                    }
-                    break;
-                default:
-                    throw new UnsupportedOperationException(
-                            "Unknown value kind " + entry.kind().name());
-            }
-        }
-    }
-
     public static void assertNoDelete(Collection<ManifestEntry> entries) {
         for (ManifestEntry entry : entries) {
             Preconditions.checkState(
@@ -157,54 +89,4 @@ public class ManifestEntry {
                     entry.file().fileName());
         }
     }
-
-    /**
-     * The same {@link Identifier} indicates that the {@link ManifestEntry} 
refers to the same data
-     * file.
-     */
-    public static class Identifier {
-        public final BinaryRow partition;
-        public final int bucket;
-        public final int level;
-        public final String fileName;
-
-        private Identifier(BinaryRow partition, int bucket, int level, String 
fileName) {
-            this.partition = partition;
-            this.bucket = bucket;
-            this.level = level;
-            this.fileName = fileName;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (!(o instanceof Identifier)) {
-                return false;
-            }
-            Identifier that = (Identifier) o;
-            return Objects.equals(partition, that.partition)
-                    && bucket == that.bucket
-                    && level == that.level
-                    && Objects.equals(fileName, that.fileName);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(partition, bucket, level, fileName);
-        }
-
-        @Override
-        public String toString() {
-            return String.format("{%s, %d, %d, %s}", partition, bucket, level, 
fileName);
-        }
-
-        public String toString(FileStorePathFactory pathFactory) {
-            return pathFactory.getPartitionString(partition)
-                    + ", bucket "
-                    + bucket
-                    + ", level "
-                    + level
-                    + ", file "
-                    + fileName;
-        }
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 0650d9a55..f81a5cdc5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -21,6 +21,7 @@ package org.apache.paimon.operation;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.manifest.AbstractManifestEntry;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestEntrySerializer;
@@ -35,8 +36,9 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.stats.FieldStatsArraySerializer;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.FileUtils;
 import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.ParallellyExecuteUtils;
 import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -47,7 +49,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -78,6 +79,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private Filter<Integer> levelFilter = null;
 
     private ManifestCacheFilter manifestCacheFilter = null;
+    private Integer scanManifestParallelism;
 
     public AbstractFileStoreScan(
             RowType partitionType,
@@ -87,7 +89,8 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
             int numOfBuckets,
-            boolean checkNumOfBuckets) {
+            boolean checkNumOfBuckets,
+            Integer scanManifestParallelism) {
         this.partitionStatsConverter = new 
FieldStatsArraySerializer(partitionType);
         this.partitionConverter = new 
RowDataToObjectArrayConverter(partitionType);
         checkArgument(bucketKeyType.getFieldCount() > 0, "The bucket keys 
should not be empty.");
@@ -99,6 +102,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         this.numOfBuckets = numOfBuckets;
         this.checkNumOfBuckets = checkNumOfBuckets;
         this.tableSchemas = new ConcurrentHashMap<>();
+        this.scanManifestParallelism = scanManifestParallelism;
     }
 
     @Override
@@ -195,6 +199,28 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     @Override
     public Plan plan() {
+
+        Pair<Long, List<ManifestEntry>> planResult = 
doPlan(this::readManifestFileMeta);
+
+        final Long readSnapshotId = planResult.getLeft();
+        final List<ManifestEntry> files = planResult.getRight();
+
+        return new Plan() {
+            @Nullable
+            @Override
+            public Long snapshotId() {
+                return readSnapshotId;
+            }
+
+            @Override
+            public List<ManifestEntry> files() {
+                return files;
+            }
+        };
+    }
+
+    private <T extends AbstractManifestEntry> Pair<Long, List<T>> doPlan(
+            Function<ManifestFileMeta, List<T>> readManifest) {
         List<ManifestFileMeta> manifests = specifiedManifests;
         Long snapshotId = specifiedSnapshotId;
         if (manifests == null) {
@@ -209,28 +235,21 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
             }
         }
 
-        final Long readSnapshot = snapshotId;
         final List<ManifestFileMeta> readManifests = manifests;
 
-        List<ManifestEntry> entries;
-        try {
-            entries =
-                    FileUtils.COMMON_IO_FORK_JOIN_POOL
-                            .submit(
-                                    () ->
-                                            readManifests
-                                                    .parallelStream()
-                                                    
.filter(this::filterManifestFileMeta)
-                                                    .flatMap(m -> 
readManifestFileMeta(m).stream())
-                                                    
.filter(this::filterByStats)
-                                                    
.collect(Collectors.toList()))
-                            .get();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new RuntimeException("Failed to read ManifestEntry list 
concurrently", e);
-        }
-
-        List<ManifestEntry> files = new ArrayList<>();
-        for (ManifestEntry file : ManifestEntry.mergeEntries(entries)) {
+        Iterable<T> entries =
+                ParallellyExecuteUtils.parallelismBatchIterable(
+                        files ->
+                                files.parallelStream()
+                                        .filter(this::filterManifestFileMeta)
+                                        .flatMap(m -> 
readManifest.apply(m).stream())
+                                        .filter(this::filterByStats)
+                                        .collect(Collectors.toList()),
+                        readManifests,
+                        scanManifestParallelism);
+
+        List<T> files = new ArrayList<>();
+        for (T file : AbstractManifestEntry.mergeEntries(entries)) {
             if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) {
                 String partInfo =
                         partitionConverter.getArity() > 0
@@ -249,7 +268,8 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
             }
 
             // bucket filter should not be applied along with partition filter
-            // because the specifiedBucket is computed against the current 
numOfBuckets
+            // because the specifiedBucket is computed against the current
+            // numOfBuckets
             // however entry.bucket() was computed against the old numOfBuckets
             // and thus the filtered manifest entries might be empty
             // which renders the bucket check invalid
@@ -257,19 +277,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                 files.add(file);
             }
         }
-
-        return new Plan() {
-            @Nullable
-            @Override
-            public Long snapshotId() {
-                return readSnapshot;
-            }
-
-            @Override
-            public List<ManifestEntry> files() {
-                return files;
-            }
-        };
+        return Pair.of(snapshotId, files);
     }
 
     private List<ManifestFileMeta> readManifests(Snapshot snapshot) {
@@ -316,19 +324,29 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     }
 
     /** Note: Keep this thread-safe. */
-    private boolean filterByBucket(ManifestEntry entry) {
+    private boolean filterByBucket(AbstractManifestEntry entry) {
         return (specifiedBucket == null || entry.bucket() == specifiedBucket);
     }
 
     /** Note: Keep this thread-safe. */
-    private boolean filterByBucketSelector(ManifestEntry entry) {
+    private boolean filterByBucketSelector(AbstractManifestEntry entry) {
         return (bucketSelector == null
                 || bucketSelector.select(entry.bucket(), 
entry.totalBuckets()));
     }
 
     /** Note: Keep this thread-safe. */
-    private boolean filterByLevel(ManifestEntry entry) {
-        return (levelFilter == null || levelFilter.test(entry.file().level()));
+    private boolean filterByLevel(AbstractManifestEntry entry) {
+        return (levelFilter == null || levelFilter.test(entry.level()));
+    }
+
+    /** Note: Keep this thread-safe. */
+    private boolean filterByStats(AbstractManifestEntry entry) {
+        // filterByStats is an action that is completed as much as possible 
and does not have an
+        // impact if it is not done.
+        if (entry instanceof ManifestEntry) {
+            return filterByStats((ManifestEntry) entry);
+        }
+        return true;
     }
 
     /** Note: Keep this thread-safe. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index fdab17423..97d761d10 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -52,7 +52,8 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
             int numOfBuckets,
-            boolean checkNumOfBuckets) {
+            boolean checkNumOfBuckets,
+            Integer scanManifestParallelism) {
         super(
                 partitionType,
                 bucketKeyType,
@@ -61,7 +62,8 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
                 manifestFileFactory,
                 manifestListFactory,
                 numOfBuckets,
-                checkNumOfBuckets);
+                checkNumOfBuckets,
+                scanManifestParallelism);
         this.rowType = rowType;
         this.fieldStatsConverters =
                 new FieldStatsConverters(sid -> scanTableSchema(sid).fields(), 
schemaId);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index a35177b00..5f85ba028 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -54,7 +54,8 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
             int numOfBuckets,
-            boolean checkNumOfBuckets) {
+            boolean checkNumOfBuckets,
+            Integer scanManifestParallelism) {
         super(
                 partitionType,
                 bucketKeyType,
@@ -63,7 +64,8 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
                 manifestFileFactory,
                 manifestListFactory,
                 numOfBuckets,
-                checkNumOfBuckets);
+                checkNumOfBuckets,
+                scanManifestParallelism);
         this.fieldStatsConverters =
                 new FieldStatsConverters(
                         sid -> 
keyValueFieldsExtractor.keyFields(scanTableSchema(sid)), schemaId);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ParallellyExecuteUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ParallellyExecuteUtils.java
new file mode 100644
index 000000000..93f52ad14
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/ParallellyExecuteUtils.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * This class is a parallel execution util class, which mainly aim to process 
tasks parallelly with
+ * memory control.
+ */
+public class ParallellyExecuteUtils {
+
+    // reduce memory usage by batch iterable process, the cached result in 
memory will be queueSize
+    public static <T, U> Iterable<T> parallelismBatchIterable(
+            Function<List<U>, List<T>> processor, List<U> input, Integer 
queueSize) {
+        if (queueSize == null) {
+            queueSize = FileUtils.COMMON_IO_FORK_JOIN_POOL.getParallelism();
+        } else if (queueSize <= 0) {
+            throw new NegativeArraySizeException("queue size should not be 
negetive");
+        }
+
+        final Queue<List<U>> stack = new ArrayDeque<>(Lists.partition(input, 
queueSize));
+
+        return () ->
+                new Iterator<T>() {
+                    List<T> activeList = null;
+                    private int index = 0;
+
+                    @Override
+                    public boolean hasNext() {
+                        advanceIfNeeded();
+                        return activeList != null && index < activeList.size();
+                    }
+
+                    @Override
+                    public T next() {
+                        advanceIfNeeded();
+                        if (activeList == null || index >= activeList.size()) {
+                            throw new NoSuchElementException();
+                        }
+                        return activeList.get(index++);
+                    }
+
+                    private void advanceIfNeeded() {
+                        while ((activeList == null || index >= 
activeList.size())
+                                && stack.size() > 0) {
+                            // reset index
+                            index = 0;
+                            try {
+                                activeList =
+                                        CompletableFuture.supplyAsync(
+                                                        () -> 
processor.apply(stack.poll()),
+                                                        
FileUtils.COMMON_IO_FORK_JOIN_POOL)
+                                                .get();
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                    }
+                };
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/ParallellyExecuteUtilsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/utils/ParallellyExecuteUtilsTest.java
new file mode 100644
index 000000000..6d1fd1321
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/utils/ParallellyExecuteUtilsTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/** This test mainly test for the methods in {@link ParallellyExecuteUtils}. */
+public class ParallellyExecuteUtilsTest {
+
+    @Test
+    public void testParallelismBatchIterable() {
+        List<Integer> nums = new ArrayList<>();
+
+        for (int i = 0; i < 10000; i++) {
+            nums.add(i);
+        }
+
+        Iterable<Integer> re =
+                ParallellyExecuteUtils.parallelismBatchIterable(
+                        l -> l.parallelStream().map(i -> i + 
1).collect(Collectors.toList()),
+                        nums,
+                        null);
+
+        AtomicInteger atomicInteger = new AtomicInteger(0);
+        re.forEach(
+                i ->
+                        Assertions.assertThat(i)
+                                
.isEqualTo(nums.get(atomicInteger.getAndIncrement()) + 1));
+    }
+
+    @Test
+    public void testParallelismBatchIterable2() {
+        List<Integer> nums = new ArrayList<>();
+
+        for (int i = 0; i < 12345; i++) {
+            nums.add(i);
+        }
+
+        Iterable<Integer> re =
+                ParallellyExecuteUtils.parallelismBatchIterable(
+                        l -> l.parallelStream().map(i -> i + 
1).collect(Collectors.toList()),
+                        nums,
+                        null);
+
+        AtomicInteger atomicInteger = new AtomicInteger(0);
+        re.forEach(
+                i ->
+                        Assertions.assertThat(i)
+                                
.isEqualTo(nums.get(atomicInteger.getAndIncrement()) + 1));
+    }
+
+    @Test
+    public void testParallelismBatchIterable3() {
+        List<Integer> nums = new ArrayList<>();
+
+        for (int i = 0; i < 10000; i++) {
+            nums.add(i);
+        }
+
+        Iterable<Integer> re =
+                ParallellyExecuteUtils.parallelismBatchIterable(
+                        l -> l.parallelStream().map(i -> i + 
1).collect(Collectors.toList()),
+                        nums,
+                        null);
+
+        Iterator<Integer> iterator = re.iterator();
+        for (int i = 0; i < 100; i++) {
+            iterator.hasNext();
+        }
+
+        AtomicInteger atomicInteger = new AtomicInteger(0);
+        while (iterator.hasNext()) {
+            Integer i = iterator.next();
+            
Assertions.assertThat(i).isEqualTo(nums.get(atomicInteger.getAndIncrement()) + 
1);
+        }
+    }
+
+    @Test
+    public void testParallelismBatchIterable4() {
+        List<Integer> nums = new ArrayList<>();
+
+        for (int i = 0; i < 12345; i++) {
+            nums.add(i);
+        }
+
+        Iterable<Integer> re =
+                ParallellyExecuteUtils.parallelismBatchIterable(
+                        l -> l.parallelStream().map(i -> i + 
1).collect(Collectors.toList()),
+                        nums,
+                        null);
+
+        Iterator<Integer> iterator = re.iterator();
+        for (int i = 0; i < 123; i++) {
+            iterator.hasNext();
+        }
+
+        AtomicInteger atomicInteger = new AtomicInteger(0);
+        while (iterator.hasNext()) {
+            Integer i = iterator.next();
+            
Assertions.assertThat(i).isEqualTo(nums.get(atomicInteger.getAndIncrement()) + 
1);
+        }
+    }
+
+    @Test
+    public void testForEmptyInput() {
+        Iterable<Integer> re =
+                ParallellyExecuteUtils.parallelismBatchIterable(
+                        l -> l.parallelStream().map(i -> i + 
1).collect(Collectors.toList()),
+                        (List<Integer>) Collections.EMPTY_LIST,
+                        null);
+        Assertions.assertThat(!re.iterator().hasNext()).isTrue();
+    }
+
+    @Test
+    public void testForSingletonInput() {
+        Iterable<Integer> re =
+                ParallellyExecuteUtils.parallelismBatchIterable(
+                        l -> l.parallelStream().map(i -> i + 
1).collect(Collectors.toList()),
+                        Collections.singletonList(1),
+                        null);
+        re.forEach(i -> Assertions.assertThat(i).isEqualTo(2));
+    }
+
+    @Test
+    public void testDifferentQueueSizeWithFilterElement() {
+        for (int queueSize = 1; queueSize < 20; queueSize++) {
+            Iterable<Integer> re =
+                    ParallellyExecuteUtils.parallelismBatchIterable(
+                            l -> l.parallelStream().filter(i -> i > 
5).collect(Collectors.toList()),
+                            Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
+                            queueSize);
+            Integer[] result = new Integer[] {6, 7, 8, 9, 10};
+
+            Assertions.assertThat(re).hasSameElementsAs(Arrays.asList(result));
+        }
+    }
+}


Reply via email to