imbajin commented on code in PR #683:
URL: 
https://github.com/apache/incubator-hugegraph-toolchain/pull/683#discussion_r2464759478


##########
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java:
##########
@@ -200,27 +595,152 @@ private void loadInputs(List<InputStruct> structs) {
         }
     }
 
-    private void loadStructs(List<InputStruct> structs) {
-        // Load input structs one by one
+    private List<InputTaskItem> prepareTaskItems(List<InputStruct> structs,
+                                                 boolean scatter) {
+        ArrayList<InputTaskItem> tasks = new ArrayList<>();
+        ArrayList<InputReader> readers = new ArrayList<>();
+        int curFile = 0;
+        int curIndex = 0;
         for (InputStruct struct : structs) {
-            if (this.context.stopped()) {
-                break;
-            }
             if (struct.skip()) {
                 continue;
             }
-            // Create and init InputReader, fetch next batch lines
-            try (InputReader reader = InputReader.create(struct.input())) {
-                // Init reader
-                reader.init(this.context, struct);
-                // Load data from current input mapping
-                this.loadStruct(struct, reader);
+
+            // Create and init InputReader
+            try {
+                LOG.info("Start loading: '{}'", struct);
+
+                InputReader reader = InputReader.create(struct.input());
+                List<InputReader> readerList = reader.multiReaders() ?
+                                               reader.split() :
+                                               ImmutableList.of(reader);
+                readers.addAll(readerList);
+
+                LOG.info("total {} found in '{}'", readerList.size(), struct);
+                tasks.ensureCapacity(tasks.size() + readerList.size());
+                int seq = 0;
+                for (InputReader r : readerList) {
+                    if (curFile >= this.context.options().startFile &&
+                        (this.context.options().endFile == -1 ||
+                         curFile < this.context.options().endFile)) {
+                        // Load data from current input mapping
+                        tasks.add(new InputTaskItem(struct, r, seq, curIndex));
+                    } else {
+                        r.close();
+                    }
+                    seq += 1;
+                    curFile += 1;
+                }
+                if (this.context.options().endFile != -1 &&
+                    curFile >= this.context.options().endFile) {
+                    break;
+                }
             } catch (InitException e) {
                 throw new LoadException("Failed to init input reader", e);
+            } finally {
+                Set<InputReader> usedReaders = tasks.stream()
+                                                    .map(item -> item.reader)
+                                                    
.collect(Collectors.toSet());
+                for (InputReader r : readers) {
+                    if (!usedReaders.contains(r)) {
+                        try {
+                            r.close();
+                        } catch (Exception ex) {
+                            LOG.warn("Failed to close reader: {}", 
ex.getMessage());
+                        }
+                    }
+                }
+            }
+            curIndex += 1;
+        }
+        // sort by seqNumber to allow scatter loading from different sources
+        if (scatter) {
+            tasks.sort(Comparator.comparingInt((InputTaskItem o) -> 
o.structIndex)
+                     .thenComparingInt(o -> o.seqNumber));
+        }
+
+        return tasks;
+    }
+
+    private void loadStructs(List<InputStruct> structs) {
+        int parallelCount = this.context.options().parallelCount;
+        if (structs.size() == 0) {
+            return;
+        }
+        if (parallelCount <= 0) {
+            parallelCount = Math.min(structs.size(), 
Runtime.getRuntime().availableProcessors() * 2);
+        }
+
+        boolean scatter = this.context.options().scatterSources;
+
+        LOG.info("{} threads for loading {} structs, from {} to {} in {} mode",
+                 parallelCount, structs.size(), 
this.context.options().startFile,
+                 this.context.options().endFile,
+                 scatter ? "scatter" : "sequential");
+
+        ExecutorService loadService = 
ExecutorUtil.newFixedThreadPool(parallelCount,
+                                                                      
"loader");
+
+        List<InputTaskItem> taskItems = prepareTaskItems(structs, scatter);
+        List<CompletableFuture<Void>> loadTasks = new ArrayList<>();
+
+        if (taskItems.isEmpty()) {
+            LOG.info("No tasks to execute after filtering");
+            return;
+        }
+
+        for (InputTaskItem item : taskItems) {
+            // Init reader
+            item.reader.init(this.context, item.struct);
+            // Load data from current input mapping
+            loadTasks.add(
+                    this.asyncLoadStruct(item.struct, item.reader,
+                                         loadService));
+        }
+
+        LOG.info("waiting for loading finish {}", loadTasks.size());
+        // wait for finish
+        try {
+            CompletableFuture.allOf(loadTasks.toArray(new 
CompletableFuture[0]))
+                             .join();
+        } catch (CompletionException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof ParseException) {
+                throw (ParseException) cause;
+            } else if (cause instanceof LoadException) {
+                throw (LoadException) cause;
+            } else if (cause != null) {
+                if (cause instanceof RuntimeException) {
+                    throw (RuntimeException) cause;
+                } else {
+                    throw new RuntimeException(cause);
+                }
+            } else {
+                throw e;
             }
+        } catch (Throwable t) {
+            throw t;
+        } finally {
+            // Shutdown service

Review Comment:
   ⚠️ **Important: CompletableFuture 异常处理不完整**
   
   在处理并发任务结果时:
   ```java
   for (CompletableFuture<Void> future : loadTasks) {
       try {
           future.join();
       } catch (CompletionException e) {
           this.context.occurredError();
           throw LoadUtil.targetRuntimeException(e);
       }
   }
   ```
   
   问题:
   1. 当一个任务失败时,其他正在执行的任务如何处理?应该继续还是取消?
   2. 没有超时机制,如果某个任务hang住,会导致整个加载过程卡死
   3. 只捕获了 CompletionException,其他异常(如 CancellationException)会如何处理?
   
   **建议:**
   ```java
   try {
       future.get(timeout, TimeUnit.SECONDS); // 使用 get 替代 join,支持超时
   } catch (TimeoutException e) {
       future.cancel(true);
       throw new LoadException("Task timeout", e);
   } catch (ExecutionException | InterruptedException e) {
       // 统一异常处理
   }
   ```



##########
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java:
##########
@@ -153,11 +271,288 @@ private void createSchema() {
                 throw new LoadException("Failed to read schema file '%s'", e,
                                         options.schema);
             }
-            groovyExecutor.execute(script, client);
+
+            if (!options.shorterIDConfigs.isEmpty()) {
+                for (ShortIdConfig config : options.shorterIDConfigs) {
+                    PropertyKey propertyKey = 
client.schema().propertyKey(config.getIdFieldName())
+                                                    .ifNotExist()
+                                                    
.dataType(config.getIdFieldType())
+                                                    .build();
+                    client.schema().addPropertyKey(propertyKey);
+                }
+                groovyExecutor.execute(script, client);
+                List<VertexLabel> vertexLabels = 
client.schema().getVertexLabels();
+                for (VertexLabel vertexLabel : vertexLabels) {
+                    ShortIdConfig config;
+                    if ((config = 
options.getShortIdConfig(vertexLabel.name())) != null) {
+                        config.setLabelID(vertexLabel.id());

Review Comment:
   ⚠️ **Important: Schema 创建缺少事务性保证**
   
   在 `createSchema()` 方法中,先添加 PropertyKey,再执行 Groovy 脚本,最后创建 IndexLabel:
   
   ```java
   PropertyKey propertyKey = 
client.schema().propertyKey(config.getIdFieldName())
                                   .ifNotExist()
                                   .dataType(config.getIdFieldType())
                                   .build();
   client.schema().addPropertyKey(propertyKey);
   
   groovyExecutor.execute(script, client);
   
   // ... later
   client.schema().addIndexLabel(indexLabel);
   ```
   
   问题:
   1. 如果中间步骤失败(如 groovyExecutor.execute()),已创建的 PropertyKey 不会回滚
   2. 多次运行可能导致部分 schema 重复创建
   3. 没有验证 Groovy 脚本的执行结果
   
   **建议:**
   - 在创建 schema 前检查是否已存在
   - 提供清理/回滚机制
   - 记录每步操作结果,便于问题排查



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to