imbajin commented on code in PR #704:
URL: 
https://github.com/apache/incubator-hugegraph-toolchain/pull/704#discussion_r2702527892


##########
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java:
##########
@@ -152,13 +156,12 @@ public void submitBatch(InputStruct struct, 
ElementMapping mapping,
         CompletableFuture.runAsync(task, this.batchService).whenComplete(
             (r, e) -> {
                 if (e != null) {
-                    LOG.warn("Batch insert {} error, try single insert",
-                             mapping.type(), e);
-                    // The time of single insert is counted separately
-                    this.submitInSingle(struct, mapping, batch);
-                } else {
-                    summary.metrics(struct).minusFlighting(batch.size());
+                    LOG.error("Batch insert {} error, interrupting import", 
mapping.type(), e);

Review Comment:
   ‼️ **Critical: 批量插入失败处理逻辑变更需要重新考虑**
   
   原逻辑在批量插入失败时会自动降级到单条插入(fallback机制),这是一个重要的容错设计。新代码直接停止整个导入流程可能过于严格。
   
   **建议**:
   1. 保留原有的降级机制作为默认行为
   2. 如果确实需要fail-fast模式,建议添加一个新的配置选项(如`--fail-on-batch-error`)让用户选择
   3. 至少应该在日志中详细记录批次失败的原因,以便用户调试
   
   **理由**:批量插入可能因为临时网络抖动、单条数据格式问题等原因失败,直接终止会影响大数据量导入的鲁棒性。



##########
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java:
##########
@@ -165,21 +169,30 @@ public final class LoadOptions implements Cloneable {
     public int singleInsertThreads = 8;
 
     @Parameter(names = {"--max-conn"}, arity = 1,
-               description = "Max number of HTTP connections to server")
-    public int maxConnections = CPUS * 4;
+               validateWith = {PositiveValidator.class},
+               description = "Max number of HTTP connections to server. " +
+                             "If left as default and batch-insert-threads is " 
+
+                             "set, this may be auto-adjusted")
+    public int maxConnections = DEFAULT_MAX_CONNECTIONS;
 
     @Parameter(names = {"--max-conn-per-route"}, arity = 1,
-               description = "Max number of HTTP connections to each route")
-    public int maxConnectionsPerRoute = CPUS * 2;
+               validateWith = {PositiveValidator.class},
+               description = "Max number of HTTP connections to each route. " +
+                             "If left as default and batch-insert-threads is " 
+
+                             "set, this may be auto-adjusted")
+    public int maxConnectionsPerRoute = DEFAULT_MAX_CONNECTIONS_PER_ROUTE;
 
     @Parameter(names = {"--batch-size"}, arity = 1,
                validateWith = {PositiveValidator.class},
                description = "The number of lines in each submit")
     public int batchSize = 500;
 
-    @Parameter(names = {"--parallel-count"}, arity = 1,
-            description = "The number of parallel read pipelines")
-    public int parallelCount = 1;
+    @Parameter(names = {"--parallel-count", "--parser-threads"}, arity = 1,

Review Comment:
   ⚠️ **参数重命名可能导致向后兼容性问题**
   
   `--parallel-count` 重命名为 `--parser-threads` 会破坏现有用户的脚本和配置。
   
   **建议**:
   虽然代码中使用了双名称(`--parallel-count` 和 `--parser-threads`),但需要确认:
   1. 是否在文档中标记 `--parallel-count` 为 deprecated
   2. 建议在日志中添加警告信息,提示用户使用新参数名
   3. 计划在未来版本中移除旧名称时,需要在 CHANGELOG 中明确说明



##########
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java:
##########
@@ -165,21 +169,30 @@ public final class LoadOptions implements Cloneable {
     public int singleInsertThreads = 8;
 
     @Parameter(names = {"--max-conn"}, arity = 1,
-               description = "Max number of HTTP connections to server")
-    public int maxConnections = CPUS * 4;
+               validateWith = {PositiveValidator.class},
+               description = "Max number of HTTP connections to server. " +
+                             "If left as default and batch-insert-threads is " 
+
+                             "set, this may be auto-adjusted")

Review Comment:
   ⚠️ **默认值变更逻辑不清晰**
   
   变更说明中提到"如果 max-conn/max-conn-per-route 保持默认值,可能会根据此值自动调整",但在代码中看不到自动调整的实现逻辑。
   
   **问题**:
   1. 描述中的"auto-adjusted"功能在哪里实现?
   2. 如果尚未实现,描述信息会误导用户
   
   **建议**:
   如果这个自动调整逻辑在其他地方(如初始化代码)实现,请确保测试覆盖;如果尚未实现,请移除描述中的相关说明或在本PR中实现该逻辑。



##########
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java:
##########
@@ -165,21 +169,30 @@ public final class LoadOptions implements Cloneable {
     public int singleInsertThreads = 8;
 
     @Parameter(names = {"--max-conn"}, arity = 1,
-               description = "Max number of HTTP connections to server")
-    public int maxConnections = CPUS * 4;
+               validateWith = {PositiveValidator.class},
+               description = "Max number of HTTP connections to server. " +
+                             "If left as default and batch-insert-threads is " 
+
+                             "set, this may be auto-adjusted")
+    public int maxConnections = DEFAULT_MAX_CONNECTIONS;
 
     @Parameter(names = {"--max-conn-per-route"}, arity = 1,
-               description = "Max number of HTTP connections to each route")
-    public int maxConnectionsPerRoute = CPUS * 2;
+               validateWith = {PositiveValidator.class},
+               description = "Max number of HTTP connections to each route. " +
+                             "If left as default and batch-insert-threads is " 
+
+                             "set, this may be auto-adjusted")
+    public int maxConnectionsPerRoute = DEFAULT_MAX_CONNECTIONS_PER_ROUTE;
 
     @Parameter(names = {"--batch-size"}, arity = 1,
                validateWith = {PositiveValidator.class},
                description = "The number of lines in each submit")
     public int batchSize = 500;
 
-    @Parameter(names = {"--parallel-count"}, arity = 1,
-            description = "The number of parallel read pipelines")
-    public int parallelCount = 1;
+    @Parameter(names = {"--parallel-count", "--parser-threads"}, arity = 1,
+               validateWith = {PositiveValidator.class},
+               description = "The number of parallel read pipelines. " +
+                             "Default: auto max(2, cpu/2). " +
+                             "Must be >= 1")

Review Comment:
   ⚠️ **默认并行线程数变更需要性能测试验证**
   
   从固定值 `1` 变更为 `Math.max(2, CPUS/2)` 是一个显著的性能相关变更。
   
   **建议**:
   1. 需要在不同CPU配置(2核、4核、8核、16核+)的机器上进行性能测试
   2. 验证在不同数据规模下的表现(小数据集 vs 大数据集)
   3. 在PR描述的"Verifying these changes"章节补充性能测试结果
   4. 考虑是否需要设置一个上限值,避免在高核心数机器上创建过多线程
   
   **测试建议**:
   ```bash
   # 测试不同并行度下的性能
   for i in 1 2 4 8; do
     time hugegraph-loader --parser-threads $i ...
   done
   ```



##########
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java:
##########
@@ -662,24 +662,21 @@ private List<InputTaskItem> 
prepareTaskItems(List<InputStruct> structs,
     }
 
     private void loadStructs(List<InputStruct> structs) {
-        int parallelCount = this.context.options().parallelCount;
+        int parallelThreads = this.context.options().parallelThreads;

Review Comment:
   🧹 **代码清理建议**
   
   既然参数名已经从 `parallelCount` 重命名为 `parallelThreads`,相应的变量名也应该更新以保持一致性。
   
   ```suggestion
           int parallelThreads = this.context.options().parallelThreads;
           if (structs.size() == 0) {
               return;
           }
   ```
   
   这样可以避免变量名和配置字段名不一致带来的混淆。



##########
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/task/TaskManager.java:
##########
@@ -137,6 +138,9 @@ public void shutdown() {
 
     public void submitBatch(InputStruct struct, ElementMapping mapping,
                             List<Record> batch) {
+        if (this.context.stopped()) {

Review Comment:
   ⚠️ **新增的条件检查位置不当**
   
   在 `submitBatch` 方法开始处添加 `context.stopped()` 
检查是好的,但这个检查应该在获取信号量**之前**,否则在停止状态下仍然会尝试获取信号量资源。
   
   ```suggestion
       public void submitBatch(InputStruct struct, ElementMapping mapping,
                               List<Record> batch) {
           if (this.context.stopped()) {
               return;
           }
           long start = System.currentTimeMillis();
           try {
               this.batchSemaphore.acquire();
   ```
   
   **理由**:当前代码会在停止状态下浪费信号量资源获取的时间,影响关闭性能。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to