imbajin commented on code in PR #683:
URL: 
https://github.com/apache/incubator-hugegraph-toolchain/pull/683#discussion_r2458902228


##########
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java:
##########
@@ -200,25 +592,150 @@ private void loadInputs(List<InputStruct> structs) {
         }
     }
 
-    private void loadStructs(List<InputStruct> structs) {
-        // Load input structs one by one
+    private List<InputTaskItem> prepareTaskItems(List<InputStruct> structs,
+                                                 boolean scatter) {
+        ArrayList<InputTaskItem> tasks = new ArrayList<>();
+        ArrayList<InputReader> readers = new ArrayList<>();
+        int curFile = 0;
+        int curIndex = 0;
         for (InputStruct struct : structs) {
-            if (this.context.stopped()) {
-                break;
-            }
             if (struct.skip()) {
                 continue;
             }
-            // Create and init InputReader, fetch next batch lines
-            try (InputReader reader = InputReader.create(struct.input())) {
-                // Init reader
-                reader.init(this.context, struct);
-                // Load data from current input mapping
-                this.loadStruct(struct, reader);
+
+            // Create and init InputReader
+            try {
+                LOG.info("Start loading: '{}'", struct);
+
+                InputReader reader = InputReader.create(struct.input());
+                List<InputReader> readerList = reader.multiReaders() ?
+                                               reader.split() :
+                                               ImmutableList.of(reader);
+                readers.addAll(readerList);
+
+                LOG.info("total {} found in '{}'", readerList.size(), struct);
+                tasks.ensureCapacity(tasks.size() + readerList.size());
+                int seq = 0;
+                for (InputReader r : readerList) {
+                    if (curFile >= this.context.options().startFile &&
+                        (this.context.options().endFile == -1 ||
+                         curFile < this.context.options().endFile)) {
+                        // Load data from current input mapping
+                        tasks.add(new InputTaskItem(struct, r, seq, curIndex));
+                    } else {
+                        r.close();
+                    }
+                    seq += 1;
+                    curFile += 1;
+                }
+                if (this.context.options().endFile != -1 &&
+                    curFile >= this.context.options().endFile) {
+                    break;
+                }
             } catch (InitException e) {
                 throw new LoadException("Failed to init input reader", e);
+            } finally {
+                Set<InputReader> usedReaders = tasks.stream()
+                                                    .map(item -> item.reader)
+                                                    
.collect(Collectors.toSet());
+                for (InputReader r : readers) {
+                    if (!usedReaders.contains(r)) {
+                        try {
+                            r.close();
+                        } catch (Exception ex) {
+                            LOG.warn("Failed to close reader: {}", 
ex.getMessage());
+                        }
+                    }
+                }
             }
+            curIndex += 1;
+        }
+        // sort by seqNumber to allow scatter loading from different sources
+        if (scatter) {
+            tasks.sort(Comparator.comparingInt((InputTaskItem o) -> 
o.structIndex)
+                     .thenComparingInt(o -> o.seqNumber));

Review Comment:
   ⚠️ **整数溢出风险**: `parallelCount` 的默认值计算可能不合理。当 `structs.size()` 
非常大时,创建过多线程会导致系统资源耗尽。
   
   **当前代码**:
   ```java
   if (parallelCount <= 0) {
       parallelCount = structs.size();  // 可能创建数百个线程
   }
   ```
   
   **建议**: 设置合理的上限:
   ```java
   if (parallelCount <= 0) {
       parallelCount = Math.min(structs.size(), 
                                Runtime.getRuntime().availableProcessors() * 2);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to