This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5d23c7da2 [core] limit parallelly read file memory usage, extract some
methods (#1072)
5d23c7da2 is described below
commit 5d23c7da2ae4973c7d70f9e89a4b83220264142c
Author: YeJunHao <[email protected]>
AuthorDate: Mon May 8 14:02:28 2023 +0800
[core] limit parallelly read file memory usage, extract some methods (#1072)
---
.../shortcodes/generated/core_configuration.html | 18 ++-
.../org/apache/paimon/AppendOnlyFileStore.java | 3 +-
.../main/java/org/apache/paimon/CoreOptions.java | 13 ++
.../java/org/apache/paimon/KeyValueFileStore.java | 3 +-
...nifestEntry.java => AbstractManifestEntry.java} | 93 +++++-------
.../org/apache/paimon/manifest/ManifestEntry.java | 122 +---------------
.../paimon/operation/AbstractFileStoreScan.java | 100 +++++++------
.../paimon/operation/AppendOnlyFileStoreScan.java | 6 +-
.../paimon/operation/KeyValueFileStoreScan.java | 6 +-
.../paimon/utils/ParallellyExecuteUtils.java | 86 +++++++++++
.../paimon/utils/ParallellyExecuteUtilsTest.java | 162 +++++++++++++++++++++
11 files changed, 382 insertions(+), 230 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index edf14f17f..25c057926 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -194,12 +194,6 @@
<td><p>Enum</p></td>
<td>Specify the merge engine for table with primary key.<br /><br
/>Possible values:<ul><li>"deduplicate": De-duplicate and keep the last
row.</li><li>"partial-update": Partial update non-null
fields.</li><li>"aggregation": Aggregate fields with same primary
key.</li></ul></td>
</tr>
- <tr>
- <td><h5>sort-engine</h5></td>
- <td style="word-wrap: break-word;">loser-tree</td>
- <td><p>Enum</p></td>
- <td>Specify the sort engine for table with primary key.<br /><br
/>Possible values:<ul><li>"min-heap": Use min-heap for multiway
sorting.</li><li>"loser-tree": Use loser-tree for multiway sorting. Compared
with heapsort, loser-tree has fewer comparisons and is more
efficient.</li></ul></td>
- </tr>
<tr>
<td><h5>num-levels</h5></td>
<td style="word-wrap: break-word;">(none)</td>
@@ -296,6 +290,12 @@
<td>Long</td>
<td>End condition "watermark" for bounded streaming mode. Stream
reading will end when a larger watermark snapshot is encountered.</td>
</tr>
+ <tr>
+ <td><h5>scan.manifest.parallelism</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Integer</td>
+ <td>The parallelism of scanning manifest files, default value is
the size of cpu processor.Note: Scale-up this parameter will increase memory
usage while scanning manifest files.We can consider downsize it when we
encounter an out of memory exception while scanning</td>
+ </tr>
<tr>
<td><h5>scan.mode</h5></td>
<td style="word-wrap: break-word;">default</td>
@@ -344,6 +344,12 @@
<td>Duration</td>
<td>The maximum time of completed snapshots to retain.</td>
</tr>
+ <tr>
+ <td><h5>sort-engine</h5></td>
+ <td style="word-wrap: break-word;">loser-tree</td>
+ <td><p>Enum</p></td>
+ <td>Specify the sort engine for table with primary key.<br /><br
/>Possible values:<ul><li>"min-heap": Use min-heap for multiway
sorting.</li><li>"loser-tree": Use loser-tree for multiway sorting. Compared
with heapsort, loser-tree has fewer comparisons and is more
efficient.</li></ul></td>
+ </tr>
<tr>
<td><h5>source.split.open-file-cost</h5></td>
<td style="word-wrap: break-word;">4 mb</td>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 8e28b5b2d..41abd3cae 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -96,7 +96,8 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
manifestFileFactory(forWrite),
manifestListFactory(forWrite),
options.bucket(),
- forWrite);
+ forWrite,
+ options.scanManifestParallelism());
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 0405d021f..d5cb05528 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -387,6 +387,15 @@ public class CoreOptions implements Serializable {
"End condition \"watermark\" for bounded streaming
mode. Stream"
+ " reading will end when a larger
watermark snapshot is encountered.");
+ public static final ConfigOption<Integer> SCAN_MANIFEST_PARALLELISM =
+ key("scan.manifest.parallelism")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "The parallelism of scanning manifest files,
default value is the size of cpu processor."
+ + "Note: Scale-up this parameter will
increase memory usage while scanning manifest files."
+ + "We can consider downsize it when we
encounter an out of memory exception while scanning");
+
public static final ConfigOption<LogConsistency> LOG_CONSISTENCY =
key("log.consistency")
.enumType(LogConsistency.class)
@@ -844,6 +853,10 @@ public class CoreOptions implements Serializable {
return options.get(SCAN_SNAPSHOT_ID);
}
+ public Integer scanManifestParallelism() {
+ return options.get(SCAN_MANIFEST_PARALLELISM);
+ }
+
public Optional<String> sequenceField() {
return options.getOptional(SEQUENCE_FIELD);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 77fbd5d76..e1e7819b5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -121,7 +121,8 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
manifestFileFactory(forWrite),
manifestListFactory(forWrite),
options.bucket(),
- forWrite);
+ forWrite,
+ options.scanManifestParallelism());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/AbstractManifestEntry.java
similarity index 65%
copy from
paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
copy to
paimon-core/src/main/java/org/apache/paimon/manifest/AbstractManifestEntry.java
index 84af10819..cdd54cc0a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/AbstractManifestEntry.java
@@ -19,40 +19,37 @@
package org.apache.paimon.manifest;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Preconditions;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
-import static org.apache.paimon.utils.SerializationUtils.newBytesType;
-
-/** Entry of a manifest file, representing an addition / deletion of a data
file. */
-public class ManifestEntry {
-
- private final FileKind kind;
+/** Abstract a simplest model of manifest file. */
+public abstract class AbstractManifestEntry {
+ protected final FileKind kind;
+ protected final String fileName;
// for tables without partition this field should be a row with 0 columns
(not null)
- private final BinaryRow partition;
- private final int bucket;
- private final int totalBuckets;
- private final DataFileMeta file;
-
- public ManifestEntry(
- FileKind kind, BinaryRow partition, int bucket, int totalBuckets,
DataFileMeta file) {
+ protected final BinaryRow partition;
+ protected final int bucket;
+ protected final int totalBuckets;
+ protected final int level;
+
+ public AbstractManifestEntry(
+ FileKind kind,
+ String fileName,
+ BinaryRow partition,
+ int bucket,
+ int totalBuckets,
+ int level) {
this.kind = kind;
+ this.fileName = fileName;
this.partition = partition;
this.bucket = bucket;
this.totalBuckets = totalBuckets;
- this.file = file;
+ this.level = level;
}
public FileKind kind() {
@@ -71,57 +68,48 @@ public class ManifestEntry {
return totalBuckets;
}
- public DataFileMeta file() {
- return file;
+ public int level() {
+ return level;
}
public Identifier identifier() {
- return new Identifier(partition, bucket, file.level(),
file.fileName());
- }
-
- public static RowType schema() {
- List<DataField> fields = new ArrayList<>();
- fields.add(new DataField(0, "_KIND", new TinyIntType(false)));
- fields.add(new DataField(1, "_PARTITION", newBytesType(false)));
- fields.add(new DataField(2, "_BUCKET", new IntType(false)));
- fields.add(new DataField(3, "_TOTAL_BUCKETS", new IntType(false)));
- fields.add(new DataField(4, "_FILE", DataFileMeta.schema()));
- return new RowType(fields);
+ return new Identifier(partition, bucket, level, fileName);
}
@Override
public boolean equals(Object o) {
- if (!(o instanceof ManifestEntry)) {
+ if (!(o instanceof AbstractManifestEntry)) {
return false;
}
- ManifestEntry that = (ManifestEntry) o;
+ AbstractManifestEntry that = (AbstractManifestEntry) o;
return Objects.equals(kind, that.kind)
&& Objects.equals(partition, that.partition)
&& bucket == that.bucket
- && totalBuckets == that.totalBuckets
- && Objects.equals(file, that.file);
+ && level == that.level
+ && Objects.equals(fileName, that.fileName);
}
@Override
public int hashCode() {
- return Objects.hash(kind, partition, bucket, totalBuckets, file);
+ return Objects.hash(kind, partition, bucket, level, fileName);
}
@Override
public String toString() {
- return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket,
totalBuckets, file);
+ return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket,
level, fileName);
}
- public static Collection<ManifestEntry> mergeEntries(List<ManifestEntry>
entries) {
- LinkedHashMap<Identifier, ManifestEntry> map = new LinkedHashMap<>();
+ public static <T extends AbstractManifestEntry> Collection<T> mergeEntries(
+ Iterable<T> entries) {
+ LinkedHashMap<Identifier, T> map = new LinkedHashMap<>();
mergeEntries(entries, map);
return map.values();
}
- public static void mergeEntries(
- List<ManifestEntry> entries, Map<Identifier, ManifestEntry> map) {
- for (ManifestEntry entry : entries) {
- ManifestEntry.Identifier identifier = entry.identifier();
+ public static <T extends AbstractManifestEntry> void mergeEntries(
+ Iterable<T> entries, Map<Identifier, T> map) {
+ for (T entry : entries) {
+ Identifier identifier = entry.identifier();
switch (entry.kind()) {
case ADD:
Preconditions.checkState(
@@ -149,18 +137,9 @@ public class ManifestEntry {
}
}
- public static void assertNoDelete(Collection<ManifestEntry> entries) {
- for (ManifestEntry entry : entries) {
- Preconditions.checkState(
- entry.kind() != FileKind.DELETE,
- "Trying to delete file %s which is not previously added.
Manifest might be corrupted.",
- entry.file().fileName());
- }
- }
-
/**
- * The same {@link Identifier} indicates that the {@link ManifestEntry}
refers to the same data
- * file.
+ * The same {@link Identifier} indicates that the {@link
AbstractManifestEntry} refers to the
+ * same data file.
*/
public static class Identifier {
public final BinaryRow partition;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index 84af10819..5ad9f9972 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -24,61 +24,30 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TinyIntType;
-import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import static org.apache.paimon.utils.SerializationUtils.newBytesType;
/** Entry of a manifest file, representing an addition / deletion of a data
file. */
-public class ManifestEntry {
+public class ManifestEntry extends AbstractManifestEntry {
- private final FileKind kind;
- // for tables without partition this field should be a row with 0 columns
(not null)
- private final BinaryRow partition;
- private final int bucket;
- private final int totalBuckets;
private final DataFileMeta file;
public ManifestEntry(
FileKind kind, BinaryRow partition, int bucket, int totalBuckets,
DataFileMeta file) {
- this.kind = kind;
- this.partition = partition;
- this.bucket = bucket;
- this.totalBuckets = totalBuckets;
+ super(kind, file.fileName(), partition, bucket, totalBuckets,
file.level());
this.file = file;
}
- public FileKind kind() {
- return kind;
- }
-
- public BinaryRow partition() {
- return partition;
- }
-
- public int bucket() {
- return bucket;
- }
-
- public int totalBuckets() {
- return totalBuckets;
- }
-
public DataFileMeta file() {
return file;
}
- public Identifier identifier() {
- return new Identifier(partition, bucket, file.level(),
file.fileName());
- }
-
public static RowType schema() {
List<DataField> fields = new ArrayList<>();
fields.add(new DataField(0, "_KIND", new TinyIntType(false)));
@@ -112,43 +81,6 @@ public class ManifestEntry {
return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket,
totalBuckets, file);
}
- public static Collection<ManifestEntry> mergeEntries(List<ManifestEntry>
entries) {
- LinkedHashMap<Identifier, ManifestEntry> map = new LinkedHashMap<>();
- mergeEntries(entries, map);
- return map.values();
- }
-
- public static void mergeEntries(
- List<ManifestEntry> entries, Map<Identifier, ManifestEntry> map) {
- for (ManifestEntry entry : entries) {
- ManifestEntry.Identifier identifier = entry.identifier();
- switch (entry.kind()) {
- case ADD:
- Preconditions.checkState(
- !map.containsKey(identifier),
- "Trying to add file %s which is already added.
Manifest might be corrupted.",
- identifier);
- map.put(identifier, entry);
- break;
- case DELETE:
- // each dataFile will only be added once and deleted once,
- // if we know that it is added before then both add and
delete entry can be
- // removed because there won't be further operations on
this file,
- // otherwise we have to keep the delete entry because the
add entry must be
- // in the previous manifest files
- if (map.containsKey(identifier)) {
- map.remove(identifier);
- } else {
- map.put(identifier, entry);
- }
- break;
- default:
- throw new UnsupportedOperationException(
- "Unknown value kind " + entry.kind().name());
- }
- }
- }
-
public static void assertNoDelete(Collection<ManifestEntry> entries) {
for (ManifestEntry entry : entries) {
Preconditions.checkState(
@@ -157,54 +89,4 @@ public class ManifestEntry {
entry.file().fileName());
}
}
-
- /**
- * The same {@link Identifier} indicates that the {@link ManifestEntry}
refers to the same data
- * file.
- */
- public static class Identifier {
- public final BinaryRow partition;
- public final int bucket;
- public final int level;
- public final String fileName;
-
- private Identifier(BinaryRow partition, int bucket, int level, String
fileName) {
- this.partition = partition;
- this.bucket = bucket;
- this.level = level;
- this.fileName = fileName;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof Identifier)) {
- return false;
- }
- Identifier that = (Identifier) o;
- return Objects.equals(partition, that.partition)
- && bucket == that.bucket
- && level == that.level
- && Objects.equals(fileName, that.fileName);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(partition, bucket, level, fileName);
- }
-
- @Override
- public String toString() {
- return String.format("{%s, %d, %d, %s}", partition, bucket, level,
fileName);
- }
-
- public String toString(FileStorePathFactory pathFactory) {
- return pathFactory.getPartitionString(partition)
- + ", bucket "
- + bucket
- + ", level "
- + level
- + ", file "
- + fileName;
- }
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 0650d9a55..f81a5cdc5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -21,6 +21,7 @@ package org.apache.paimon.operation;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.manifest.AbstractManifestEntry;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestEntrySerializer;
@@ -35,8 +36,9 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.FieldStatsArraySerializer;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.ParallellyExecuteUtils;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SnapshotManager;
@@ -47,7 +49,6 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -78,6 +79,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private Filter<Integer> levelFilter = null;
private ManifestCacheFilter manifestCacheFilter = null;
+ private Integer scanManifestParallelism;
public AbstractFileStoreScan(
RowType partitionType,
@@ -87,7 +89,8 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
int numOfBuckets,
- boolean checkNumOfBuckets) {
+ boolean checkNumOfBuckets,
+ Integer scanManifestParallelism) {
this.partitionStatsConverter = new
FieldStatsArraySerializer(partitionType);
this.partitionConverter = new
RowDataToObjectArrayConverter(partitionType);
checkArgument(bucketKeyType.getFieldCount() > 0, "The bucket keys
should not be empty.");
@@ -99,6 +102,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
this.numOfBuckets = numOfBuckets;
this.checkNumOfBuckets = checkNumOfBuckets;
this.tableSchemas = new ConcurrentHashMap<>();
+ this.scanManifestParallelism = scanManifestParallelism;
}
@Override
@@ -195,6 +199,28 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
@Override
public Plan plan() {
+
+ Pair<Long, List<ManifestEntry>> planResult =
doPlan(this::readManifestFileMeta);
+
+ final Long readSnapshotId = planResult.getLeft();
+ final List<ManifestEntry> files = planResult.getRight();
+
+ return new Plan() {
+ @Nullable
+ @Override
+ public Long snapshotId() {
+ return readSnapshotId;
+ }
+
+ @Override
+ public List<ManifestEntry> files() {
+ return files;
+ }
+ };
+ }
+
+ private <T extends AbstractManifestEntry> Pair<Long, List<T>> doPlan(
+ Function<ManifestFileMeta, List<T>> readManifest) {
List<ManifestFileMeta> manifests = specifiedManifests;
Long snapshotId = specifiedSnapshotId;
if (manifests == null) {
@@ -209,28 +235,21 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
}
}
- final Long readSnapshot = snapshotId;
final List<ManifestFileMeta> readManifests = manifests;
- List<ManifestEntry> entries;
- try {
- entries =
- FileUtils.COMMON_IO_FORK_JOIN_POOL
- .submit(
- () ->
- readManifests
- .parallelStream()
-
.filter(this::filterManifestFileMeta)
- .flatMap(m ->
readManifestFileMeta(m).stream())
-
.filter(this::filterByStats)
-
.collect(Collectors.toList()))
- .get();
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException("Failed to read ManifestEntry list
concurrently", e);
- }
-
- List<ManifestEntry> files = new ArrayList<>();
- for (ManifestEntry file : ManifestEntry.mergeEntries(entries)) {
+ Iterable<T> entries =
+ ParallellyExecuteUtils.parallelismBatchIterable(
+ files ->
+ files.parallelStream()
+ .filter(this::filterManifestFileMeta)
+ .flatMap(m ->
readManifest.apply(m).stream())
+ .filter(this::filterByStats)
+ .collect(Collectors.toList()),
+ readManifests,
+ scanManifestParallelism);
+
+ List<T> files = new ArrayList<>();
+ for (T file : AbstractManifestEntry.mergeEntries(entries)) {
if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) {
String partInfo =
partitionConverter.getArity() > 0
@@ -249,7 +268,8 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
}
// bucket filter should not be applied along with partition filter
- // because the specifiedBucket is computed against the current
numOfBuckets
+ // because the specifiedBucket is computed against the current
+ // numOfBuckets
// however entry.bucket() was computed against the old numOfBuckets
// and thus the filtered manifest entries might be empty
// which renders the bucket check invalid
@@ -257,19 +277,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
files.add(file);
}
}
-
- return new Plan() {
- @Nullable
- @Override
- public Long snapshotId() {
- return readSnapshot;
- }
-
- @Override
- public List<ManifestEntry> files() {
- return files;
- }
- };
+ return Pair.of(snapshotId, files);
}
private List<ManifestFileMeta> readManifests(Snapshot snapshot) {
@@ -316,19 +324,29 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
}
/** Note: Keep this thread-safe. */
- private boolean filterByBucket(ManifestEntry entry) {
+ private boolean filterByBucket(AbstractManifestEntry entry) {
return (specifiedBucket == null || entry.bucket() == specifiedBucket);
}
/** Note: Keep this thread-safe. */
- private boolean filterByBucketSelector(ManifestEntry entry) {
+ private boolean filterByBucketSelector(AbstractManifestEntry entry) {
return (bucketSelector == null
|| bucketSelector.select(entry.bucket(),
entry.totalBuckets()));
}
/** Note: Keep this thread-safe. */
- private boolean filterByLevel(ManifestEntry entry) {
- return (levelFilter == null || levelFilter.test(entry.file().level()));
+ private boolean filterByLevel(AbstractManifestEntry entry) {
+ return (levelFilter == null || levelFilter.test(entry.level()));
+ }
+
+ /** Note: Keep this thread-safe. */
+ private boolean filterByStats(AbstractManifestEntry entry) {
+ // filterByStats is an action that is completed as much as possible
and does not have an
+ // impact if it is not done.
+ if (entry instanceof ManifestEntry) {
+ return filterByStats((ManifestEntry) entry);
+ }
+ return true;
}
/** Note: Keep this thread-safe. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index fdab17423..97d761d10 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -52,7 +52,8 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
int numOfBuckets,
- boolean checkNumOfBuckets) {
+ boolean checkNumOfBuckets,
+ Integer scanManifestParallelism) {
super(
partitionType,
bucketKeyType,
@@ -61,7 +62,8 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
manifestFileFactory,
manifestListFactory,
numOfBuckets,
- checkNumOfBuckets);
+ checkNumOfBuckets,
+ scanManifestParallelism);
this.rowType = rowType;
this.fieldStatsConverters =
new FieldStatsConverters(sid -> scanTableSchema(sid).fields(),
schemaId);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index a35177b00..5f85ba028 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -54,7 +54,8 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
int numOfBuckets,
- boolean checkNumOfBuckets) {
+ boolean checkNumOfBuckets,
+ Integer scanManifestParallelism) {
super(
partitionType,
bucketKeyType,
@@ -63,7 +64,8 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
manifestFileFactory,
manifestListFactory,
numOfBuckets,
- checkNumOfBuckets);
+ checkNumOfBuckets,
+ scanManifestParallelism);
this.fieldStatsConverters =
new FieldStatsConverters(
sid ->
keyValueFieldsExtractor.keyFields(scanTableSchema(sid)), schemaId);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ParallellyExecuteUtils.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ParallellyExecuteUtils.java
new file mode 100644
index 000000000..93f52ad14
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/ParallellyExecuteUtils.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * This class is a parallel execution util class, which mainly aim to process
tasks parallelly with
+ * memory control.
+ */
+public class ParallellyExecuteUtils {
+
+ // reduce memory usage by batch iterable process, the cached result in
memory will be queueSize
+ public static <T, U> Iterable<T> parallelismBatchIterable(
+ Function<List<U>, List<T>> processor, List<U> input, Integer
queueSize) {
+ if (queueSize == null) {
+ queueSize = FileUtils.COMMON_IO_FORK_JOIN_POOL.getParallelism();
+ } else if (queueSize <= 0) {
+ throw new NegativeArraySizeException("queue size should not be
negetive");
+ }
+
+ final Queue<List<U>> stack = new ArrayDeque<>(Lists.partition(input,
queueSize));
+
+ return () ->
+ new Iterator<T>() {
+ List<T> activeList = null;
+ private int index = 0;
+
+ @Override
+ public boolean hasNext() {
+ advanceIfNeeded();
+ return activeList != null && index < activeList.size();
+ }
+
+ @Override
+ public T next() {
+ advanceIfNeeded();
+ if (activeList == null || index >= activeList.size()) {
+ throw new NoSuchElementException();
+ }
+ return activeList.get(index++);
+ }
+
+ private void advanceIfNeeded() {
+ while ((activeList == null || index >=
activeList.size())
+ && stack.size() > 0) {
+ // reset index
+ index = 0;
+ try {
+ activeList =
+ CompletableFuture.supplyAsync(
+ () ->
processor.apply(stack.poll()),
+
FileUtils.COMMON_IO_FORK_JOIN_POOL)
+ .get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ };
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/ParallellyExecuteUtilsTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/ParallellyExecuteUtilsTest.java
new file mode 100644
index 000000000..6d1fd1321
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/utils/ParallellyExecuteUtilsTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/** This test mainly test for the methods in {@link ParallellyExecuteUtils}. */
+public class ParallellyExecuteUtilsTest {
+
+ @Test
+ public void testParallelismBatchIterable() {
+ List<Integer> nums = new ArrayList<>();
+
+ for (int i = 0; i < 10000; i++) {
+ nums.add(i);
+ }
+
+ Iterable<Integer> re =
+ ParallellyExecuteUtils.parallelismBatchIterable(
+ l -> l.parallelStream().map(i -> i +
1).collect(Collectors.toList()),
+ nums,
+ null);
+
+ AtomicInteger atomicInteger = new AtomicInteger(0);
+ re.forEach(
+ i ->
+ Assertions.assertThat(i)
+
.isEqualTo(nums.get(atomicInteger.getAndIncrement()) + 1));
+ }
+
+ @Test
+ public void testParallelismBatchIterable2() {
+ List<Integer> nums = new ArrayList<>();
+
+ for (int i = 0; i < 12345; i++) {
+ nums.add(i);
+ }
+
+ Iterable<Integer> re =
+ ParallellyExecuteUtils.parallelismBatchIterable(
+ l -> l.parallelStream().map(i -> i +
1).collect(Collectors.toList()),
+ nums,
+ null);
+
+ AtomicInteger atomicInteger = new AtomicInteger(0);
+ re.forEach(
+ i ->
+ Assertions.assertThat(i)
+
.isEqualTo(nums.get(atomicInteger.getAndIncrement()) + 1));
+ }
+
+ @Test
+ public void testParallelismBatchIterable3() {
+ List<Integer> nums = new ArrayList<>();
+
+ for (int i = 0; i < 10000; i++) {
+ nums.add(i);
+ }
+
+ Iterable<Integer> re =
+ ParallellyExecuteUtils.parallelismBatchIterable(
+ l -> l.parallelStream().map(i -> i +
1).collect(Collectors.toList()),
+ nums,
+ null);
+
+ Iterator<Integer> iterator = re.iterator();
+ for (int i = 0; i < 100; i++) {
+ iterator.hasNext();
+ }
+
+ AtomicInteger atomicInteger = new AtomicInteger(0);
+ while (iterator.hasNext()) {
+ Integer i = iterator.next();
+
Assertions.assertThat(i).isEqualTo(nums.get(atomicInteger.getAndIncrement()) +
1);
+ }
+ }
+
+ @Test
+ public void testParallelismBatchIterable4() {
+ List<Integer> nums = new ArrayList<>();
+
+ for (int i = 0; i < 12345; i++) {
+ nums.add(i);
+ }
+
+ Iterable<Integer> re =
+ ParallellyExecuteUtils.parallelismBatchIterable(
+ l -> l.parallelStream().map(i -> i +
1).collect(Collectors.toList()),
+ nums,
+ null);
+
+ Iterator<Integer> iterator = re.iterator();
+ for (int i = 0; i < 123; i++) {
+ iterator.hasNext();
+ }
+
+ AtomicInteger atomicInteger = new AtomicInteger(0);
+ while (iterator.hasNext()) {
+ Integer i = iterator.next();
+
Assertions.assertThat(i).isEqualTo(nums.get(atomicInteger.getAndIncrement()) +
1);
+ }
+ }
+
+ @Test
+ public void testForEmptyInput() {
+ Iterable<Integer> re =
+ ParallellyExecuteUtils.parallelismBatchIterable(
+ l -> l.parallelStream().map(i -> i +
1).collect(Collectors.toList()),
+ (List<Integer>) Collections.EMPTY_LIST,
+ null);
+ Assertions.assertThat(!re.iterator().hasNext()).isTrue();
+ }
+
+ @Test
+ public void testForSingletonInput() {
+ Iterable<Integer> re =
+ ParallellyExecuteUtils.parallelismBatchIterable(
+ l -> l.parallelStream().map(i -> i +
1).collect(Collectors.toList()),
+ Collections.singletonList(1),
+ null);
+ re.forEach(i -> Assertions.assertThat(i).isEqualTo(2));
+ }
+
+ @Test
+ public void testDifferentQueueSizeWithFilterElement() {
+ for (int queueSize = 1; queueSize < 20; queueSize++) {
+ Iterable<Integer> re =
+ ParallellyExecuteUtils.parallelismBatchIterable(
+ l -> l.parallelStream().filter(i -> i >
5).collect(Collectors.toList()),
+ Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
+ queueSize);
+ Integer[] result = new Integer[] {6, 7, 8, 9, 10};
+
+ Assertions.assertThat(re).hasSameElementsAs(Arrays.asList(result));
+ }
+ }
+}