imbajin commented on code in PR #683:
URL: 
https://github.com/apache/incubator-hugegraph-toolchain/pull/683#discussion_r2464758283


##########
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java:
##########
@@ -200,27 +595,152 @@ private void loadInputs(List<InputStruct> structs) {
         }
     }
 
-    private void loadStructs(List<InputStruct> structs) {
-        // Load input structs one by one
+    private List<InputTaskItem> prepareTaskItems(List<InputStruct> structs,
+                                                 boolean scatter) {
+        ArrayList<InputTaskItem> tasks = new ArrayList<>();
+        ArrayList<InputReader> readers = new ArrayList<>();
+        int curFile = 0;
+        int curIndex = 0;
         for (InputStruct struct : structs) {
-            if (this.context.stopped()) {
-                break;
-            }
             if (struct.skip()) {
                 continue;
             }
-            // Create and init InputReader, fetch next batch lines
-            try (InputReader reader = InputReader.create(struct.input())) {
-                // Init reader
-                reader.init(this.context, struct);
-                // Load data from current input mapping
-                this.loadStruct(struct, reader);
+
+            // Create and init InputReader
+            try {
+                LOG.info("Start loading: '{}'", struct);
+
+                InputReader reader = InputReader.create(struct.input());
+                List<InputReader> readerList = reader.multiReaders() ?
+                                               reader.split() :
+                                               ImmutableList.of(reader);
+                readers.addAll(readerList);
+
+                LOG.info("total {} found in '{}'", readerList.size(), struct);
+                tasks.ensureCapacity(tasks.size() + readerList.size());
+                int seq = 0;
+                for (InputReader r : readerList) {
+                    if (curFile >= this.context.options().startFile &&
+                        (this.context.options().endFile == -1 ||
+                         curFile < this.context.options().endFile)) {
+                        // Load data from current input mapping
+                        tasks.add(new InputTaskItem(struct, r, seq, curIndex));
+                    } else {
+                        r.close();
+                    }
+                    seq += 1;
+                    curFile += 1;
+                }
+                if (this.context.options().endFile != -1 &&
+                    curFile >= this.context.options().endFile) {
+                    break;
+                }
             } catch (InitException e) {
                 throw new LoadException("Failed to init input reader", e);
+            } finally {
+                Set<InputReader> usedReaders = tasks.stream()
+                                                    .map(item -> item.reader)
+                                                    
.collect(Collectors.toSet());
+                for (InputReader r : readers) {
+                    if (!usedReaders.contains(r)) {
+                        try {
+                            r.close();
+                        } catch (Exception ex) {
+                            LOG.warn("Failed to close reader: {}", 
ex.getMessage());
+                        }
+                    }
+                }
+            }
+            curIndex += 1;
+        }
+        // sort by seqNumber to allow scatter loading from different sources
+        if (scatter) {
+            tasks.sort(Comparator.comparingInt((InputTaskItem o) -> 
o.structIndex)
+                     .thenComparingInt(o -> o.seqNumber));
+        }
+
+        return tasks;
+    }
+
+    private void loadStructs(List<InputStruct> structs) {
+        int parallelCount = this.context.options().parallelCount;
+        if (structs.size() == 0) {
+            return;
+        }
+        if (parallelCount <= 0) {
+            parallelCount = Math.min(structs.size(), 
Runtime.getRuntime().availableProcessors() * 2);
+        }
+
+        boolean scatter = this.context.options().scatterSources;

Review Comment:
   ‼️ **Critical: 资源泄漏风险 - ExecutorService 未正确关闭**
   
   在 `loadStructs()` 方法中创建了 ExecutorService:
   ```java
   ExecutorService loadService = ExecutorUtil.newFixedThreadPool(parallelCount, 
"loader");
   ```
   
   但在异常情况下,这个线程池可能不会被正确关闭。虽然有 finally 块调用 `loadService.shutdown()`,但:
   
   1. 如果 `prepareTaskItems()` 抛出异常,ExecutorService 还未创建,但 finally 块会尝试访问它
   2. shutdown() 之后没有 awaitTermination(),可能导致任务未完成就退出
   3. 在任务执行过程中发生异常时,已提交的任务可能处于不确定状态
   
   **建议:**
   ```java
   ExecutorService loadService = null;
   try {
       loadService = ExecutorUtil.newFixedThreadPool(parallelCount, "loader");
       // ... 其他逻辑
   } finally {
       if (loadService != null) {
           loadService.shutdown();
           if (!loadService.awaitTermination(60, TimeUnit.SECONDS)) {
               loadService.shutdownNow();
           }
       }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to