This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 547759c55 [core] Optimize IncrementalStartingScanner to
randomlyExecute (#4229)
547759c55 is described below
commit 547759c55207d54451bf0c52c2fb96fdfb8154cd
Author: Jingsong Lee <[email protected]>
AuthorDate: Sun Sep 22 22:36:45 2024 +0800
[core] Optimize IncrementalStartingScanner to randomlyExecute (#4229)
---
.../table/source/snapshot/IncrementalStartingScanner.java | 12 +++++++-----
.../java/org/apache/paimon/utils/ManifestReadThreadPool.java | 8 ++++++++
2 files changed, 15 insertions(+), 5 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
index 6a8f68891..358d86cbe 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -82,8 +83,8 @@ public class IncrementalStartingScanner extends
AbstractStartingScanner {
.boxed()
.collect(Collectors.toList());
- Iterable<ManifestFileMeta> manifests =
- ManifestReadThreadPool.sequentialBatchedExecute(
+ Iterator<ManifestFileMeta> manifests =
+ ManifestReadThreadPool.randomlyExecute(
id -> {
Snapshot snapshot = snapshotManager.snapshot(id);
switch (scanMode) {
@@ -109,11 +110,12 @@ public class IncrementalStartingScanner extends
AbstractStartingScanner {
snapshots,
reader.parallelism());
- Iterable<ManifestEntry> entries =
- ManifestReadThreadPool.sequentialBatchedExecute(
+ Iterator<ManifestEntry> entries =
+ ManifestReadThreadPool.randomlyExecute(
reader::readManifest, Lists.newArrayList(manifests),
reader.parallelism());
- for (ManifestEntry entry : entries) {
+ while (entries.hasNext()) {
+ ManifestEntry entry = entries.next();
checkArgument(
entry.kind() == FileKind.ADD, "Delta or changelog should
only have ADD files.");
grouped.compute(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
index 74775f488..d967e778f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
@@ -20,6 +20,7 @@ package org.apache.paimon.utils;
import javax.annotation.Nullable;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Function;
@@ -51,4 +52,11 @@ public class ManifestReadThreadPool {
ThreadPoolExecutor executor = getExecutorService(threadNum);
return ThreadPoolUtils.sequentialBatchedExecute(executor, processor,
input, threadNum);
}
+
+ /** This method aims to parallel process tasks with randomly but return
values sequentially. */
+ public static <T, U> Iterator<T> randomlyExecute(
+ Function<U, List<T>> processor, List<U> input, @Nullable Integer
threadNum) {
+ ThreadPoolExecutor executor = getExecutorService(threadNum);
+ return ThreadPoolUtils.randomlyExecute(executor, processor, input);
+ }
}