This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 547759c55 [core] Optimize IncrementalStartingScanner to 
randomlyExecute (#4229)
547759c55 is described below

commit 547759c55207d54451bf0c52c2fb96fdfb8154cd
Author: Jingsong Lee <[email protected]>
AuthorDate: Sun Sep 22 22:36:45 2024 +0800

    [core] Optimize IncrementalStartingScanner to randomlyExecute (#4229)
---
 .../table/source/snapshot/IncrementalStartingScanner.java    | 12 +++++++-----
 .../java/org/apache/paimon/utils/ManifestReadThreadPool.java |  8 ++++++++
 2 files changed, 15 insertions(+), 5 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
index 6a8f68891..358d86cbe 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -82,8 +83,8 @@ public class IncrementalStartingScanner extends 
AbstractStartingScanner {
                         .boxed()
                         .collect(Collectors.toList());
 
-        Iterable<ManifestFileMeta> manifests =
-                ManifestReadThreadPool.sequentialBatchedExecute(
+        Iterator<ManifestFileMeta> manifests =
+                ManifestReadThreadPool.randomlyExecute(
                         id -> {
                             Snapshot snapshot = snapshotManager.snapshot(id);
                             switch (scanMode) {
@@ -109,11 +110,12 @@ public class IncrementalStartingScanner extends 
AbstractStartingScanner {
                         snapshots,
                         reader.parallelism());
 
-        Iterable<ManifestEntry> entries =
-                ManifestReadThreadPool.sequentialBatchedExecute(
+        Iterator<ManifestEntry> entries =
+                ManifestReadThreadPool.randomlyExecute(
                         reader::readManifest, Lists.newArrayList(manifests), 
reader.parallelism());
 
-        for (ManifestEntry entry : entries) {
+        while (entries.hasNext()) {
+            ManifestEntry entry = entries.next();
             checkArgument(
                     entry.kind() == FileKind.ADD, "Delta or changelog should 
only have ADD files.");
             grouped.compute(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
index 74775f488..d967e778f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
@@ -20,6 +20,7 @@ package org.apache.paimon.utils;
 
 import javax.annotation.Nullable;
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.function.Function;
@@ -51,4 +52,11 @@ public class ManifestReadThreadPool {
         ThreadPoolExecutor executor = getExecutorService(threadNum);
         return ThreadPoolUtils.sequentialBatchedExecute(executor, processor, 
input, threadNum);
     }
+
+    /** This method aims to parallel process tasks with randomly but return 
values sequentially. */
+    public static <T, U> Iterator<T> randomlyExecute(
+            Function<U, List<T>> processor, List<U> input, @Nullable Integer 
threadNum) {
+        ThreadPoolExecutor executor = getExecutorService(threadNum);
+        return ThreadPoolUtils.randomlyExecute(executor, processor, input);
+    }
 }

Reply via email to