imbajin commented on code in PR #683:
URL: 
https://github.com/apache/incubator-hugegraph-toolchain/pull/683#discussion_r2458902776


##########
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java:
##########
@@ -200,25 +592,150 @@ private void loadInputs(List<InputStruct> structs) {
         }
     }
 
-    private void loadStructs(List<InputStruct> structs) {
-        // Load input structs one by one
+    private List<InputTaskItem> prepareTaskItems(List<InputStruct> structs,
+                                                 boolean scatter) {
+        ArrayList<InputTaskItem> tasks = new ArrayList<>();
+        ArrayList<InputReader> readers = new ArrayList<>();
+        int curFile = 0;
+        int curIndex = 0;
         for (InputStruct struct : structs) {
-            if (this.context.stopped()) {
-                break;
-            }
             if (struct.skip()) {
                 continue;
             }
-            // Create and init InputReader, fetch next batch lines
-            try (InputReader reader = InputReader.create(struct.input())) {
-                // Init reader
-                reader.init(this.context, struct);
-                // Load data from current input mapping
-                this.loadStruct(struct, reader);
+
+            // Create and init InputReader
+            try {
+                LOG.info("Start loading: '{}'", struct);
+
+                InputReader reader = InputReader.create(struct.input());
+                List<InputReader> readerList = reader.multiReaders() ?
+                                               reader.split() :
+                                               ImmutableList.of(reader);
+                readers.addAll(readerList);
+
+                LOG.info("total {} found in '{}'", readerList.size(), struct);
+                tasks.ensureCapacity(tasks.size() + readerList.size());
+                int seq = 0;
+                for (InputReader r : readerList) {
+                    if (curFile >= this.context.options().startFile &&
+                        (this.context.options().endFile == -1 ||
+                         curFile < this.context.options().endFile)) {
+                        // Load data from current input mapping
+                        tasks.add(new InputTaskItem(struct, r, seq, curIndex));
+                    } else {
+                        r.close();
+                    }
+                    seq += 1;
+                    curFile += 1;
+                }
+                if (this.context.options().endFile != -1 &&
+                    curFile >= this.context.options().endFile) {
+                    break;
+                }
             } catch (InitException e) {
                 throw new LoadException("Failed to init input reader", e);
+            } finally {
+                Set<InputReader> usedReaders = tasks.stream()
+                                                    .map(item -> item.reader)
+                                                    
.collect(Collectors.toSet());
+                for (InputReader r : readers) {
+                    if (!usedReaders.contains(r)) {
+                        try {
+                            r.close();
+                        } catch (Exception ex) {
+                            LOG.warn("Failed to close reader: {}", 
ex.getMessage());
+                        }
+                    }
+                }
             }
+            curIndex += 1;
+        }
+        // sort by seqNumber to allow scatter loading from different sources
+        if (scatter) {
+            tasks.sort(Comparator.comparingInt((InputTaskItem o) -> 
o.structIndex)
+                     .thenComparingInt(o -> o.seqNumber));
         }
+
+        return tasks;
+    }
+
+    private void loadStructs(List<InputStruct> structs) {
+        int parallelCount = this.context.options().parallelCount;
+        if (structs.size() == 0) {
+            return;
+        }
+        if (parallelCount <= 0) {
+            parallelCount = Math.min(structs.size(), 
Runtime.getRuntime().availableProcessors() * 2);
+        }
+
+        boolean scatter = this.context.options().scatterSources;
+
+        LOG.info("{} threads for loading {} structs, from {} to {} in {} mode",
+                 parallelCount, structs.size(), 
this.context.options().startFile,
+                 this.context.options().endFile,
+                 scatter ? "scatter" : "sequential");
+
+        ExecutorService loadService = 
ExecutorUtil.newFixedThreadPool(parallelCount,
+                                                                      
"loader");
+
+        List<InputTaskItem> taskItems = prepareTaskItems(structs, scatter);
+        List<CompletableFuture<Void>> loadTasks = new ArrayList<>();
+
+        if (taskItems.isEmpty()) {
+            LOG.info("No tasks to execute after filtering");
+            return;

Review Comment:
   ‼️ **错误传播不一致**: 在并发加载中,如果某个任务失败,其他任务会继续执行,可能导致部分数据加载成功、部分失败的不一致状态。
   
   **问题**: `CompletableFuture.allOf().join()` 只会抛出第一个异常,但其他任务可能已经部分执行。
   
   **建议**: 实现快速失败机制:
   ```java
   AtomicBoolean hasFailed = new AtomicBoolean(false);
   for (InputTaskItem item : taskItems) {
       if (hasFailed.get()) break;  // 快速失败
       loadTasks.add(
           this.asyncLoadStruct(item.struct, item.reader, loadService)
               .exceptionally(ex -> {
                   hasFailed.set(true);
                   // 取消其他任务
                   loadTasks.forEach(f -> f.cancel(true));
                   throw new CompletionException(ex);
               })
       );
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to