imbajin commented on code in PR #683:
URL:
https://github.com/apache/incubator-hugegraph-toolchain/pull/683#discussion_r2464758678
##########
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java:
##########
@@ -200,27 +595,152 @@ private void loadInputs(List<InputStruct> structs) {
}
}
- private void loadStructs(List<InputStruct> structs) {
- // Load input structs one by one
+ private List<InputTaskItem> prepareTaskItems(List<InputStruct> structs,
+ boolean scatter) {
+ ArrayList<InputTaskItem> tasks = new ArrayList<>();
+ ArrayList<InputReader> readers = new ArrayList<>();
+ int curFile = 0;
+ int curIndex = 0;
for (InputStruct struct : structs) {
- if (this.context.stopped()) {
- break;
- }
if (struct.skip()) {
continue;
}
- // Create and init InputReader, fetch next batch lines
- try (InputReader reader = InputReader.create(struct.input())) {
- // Init reader
- reader.init(this.context, struct);
- // Load data from current input mapping
- this.loadStruct(struct, reader);
+
+ // Create and init InputReader
+ try {
+ LOG.info("Start loading: '{}'", struct);
+
+ InputReader reader = InputReader.create(struct.input());
+ List<InputReader> readerList = reader.multiReaders() ?
+ reader.split() :
+ ImmutableList.of(reader);
+ readers.addAll(readerList);
+
+ LOG.info("total {} found in '{}'", readerList.size(), struct);
+ tasks.ensureCapacity(tasks.size() + readerList.size());
+ int seq = 0;
+ for (InputReader r : readerList) {
+ if (curFile >= this.context.options().startFile &&
+ (this.context.options().endFile == -1 ||
+ curFile < this.context.options().endFile)) {
+ // Load data from current input mapping
+ tasks.add(new InputTaskItem(struct, r, seq, curIndex));
+ } else {
+ r.close();
+ }
+ seq += 1;
+ curFile += 1;
+ }
+ if (this.context.options().endFile != -1 &&
+ curFile >= this.context.options().endFile) {
+ break;
+ }
} catch (InitException e) {
throw new LoadException("Failed to init input reader", e);
+ } finally {
+ Set<InputReader> usedReaders = tasks.stream()
+ .map(item -> item.reader)
+
.collect(Collectors.toSet());
+ for (InputReader r : readers) {
+ if (!usedReaders.contains(r)) {
+ try {
+ r.close();
+ } catch (Exception ex) {
+ LOG.warn("Failed to close reader: {}",
ex.getMessage());
+ }
+ }
+ }
+ }
+ curIndex += 1;
+ }
+ // sort by seqNumber to allow scatter loading from different sources
+ if (scatter) {
+ tasks.sort(Comparator.comparingInt((InputTaskItem o) ->
o.structIndex)
+ .thenComparingInt(o -> o.seqNumber));
+ }
+
+ return tasks;
+ }
+
+ private void loadStructs(List<InputStruct> structs) {
+ int parallelCount = this.context.options().parallelCount;
+ if (structs.size() == 0) {
+ return;
+ }
+ if (parallelCount <= 0) {
+ parallelCount = Math.min(structs.size(),
Runtime.getRuntime().availableProcessors() * 2);
+ }
+
+ boolean scatter = this.context.options().scatterSources;
+
+ LOG.info("{} threads for loading {} structs, from {} to {} in {} mode",
+ parallelCount, structs.size(),
this.context.options().startFile,
+ this.context.options().endFile,
+ scatter ? "scatter" : "sequential");
+
+ ExecutorService loadService =
ExecutorUtil.newFixedThreadPool(parallelCount,
+
"loader");
+
+ List<InputTaskItem> taskItems = prepareTaskItems(structs, scatter);
+ List<CompletableFuture<Void>> loadTasks = new ArrayList<>();
+
+ if (taskItems.isEmpty()) {
+ LOG.info("No tasks to execute after filtering");
+ return;
+ }
+
+ for (InputTaskItem item : taskItems) {
+ // Init reader
+ item.reader.init(this.context, item.struct);
+ // Load data from current input mapping
+ loadTasks.add(
+ this.asyncLoadStruct(item.struct, item.reader,
+ loadService));
+ }
+
+ LOG.info("waiting for loading finish {}", loadTasks.size());
+ // wait for finish
+ try {
+ CompletableFuture.allOf(loadTasks.toArray(new
CompletableFuture[0]))
+ .join();
+ } catch (CompletionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof ParseException) {
+ throw (ParseException) cause;
+ } else if (cause instanceof LoadException) {
+ throw (LoadException) cause;
+ } else if (cause != null) {
+ if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
Review Comment:
‼️ **Critical: 线程安全问题 - InputProgress 的并发访问**
代码中多个线程并发访问 `InputProgress`,虽然使用了 synchronized map,但在某些操作中仍存在竞态条件:
在 `loadStruct()` 方法中:
```java
loadTasks.add(CompletableFuture.supplyAsync(() -> {
// ...
this.context.updateProgress(item.struct, item.reader);
// ...
}, loadService));
```
`updateProgress()` 内部可能涉及多步操作,单纯使用 synchronized map 不足以保证原子性。
**建议:**
1. 使用 `ConcurrentHashMap` 配合原子操作
2. 或在更高层次(方法级别)加锁
3. 确保 progress 更新的原子性,特别是在读-修改-写场景中
##########
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java:
##########
@@ -200,27 +595,152 @@ private void loadInputs(List<InputStruct> structs) {
}
}
- private void loadStructs(List<InputStruct> structs) {
- // Load input structs one by one
+ private List<InputTaskItem> prepareTaskItems(List<InputStruct> structs,
+ boolean scatter) {
+ ArrayList<InputTaskItem> tasks = new ArrayList<>();
+ ArrayList<InputReader> readers = new ArrayList<>();
+ int curFile = 0;
+ int curIndex = 0;
for (InputStruct struct : structs) {
- if (this.context.stopped()) {
- break;
- }
if (struct.skip()) {
continue;
}
- // Create and init InputReader, fetch next batch lines
- try (InputReader reader = InputReader.create(struct.input())) {
- // Init reader
- reader.init(this.context, struct);
- // Load data from current input mapping
- this.loadStruct(struct, reader);
+
+ // Create and init InputReader
+ try {
+ LOG.info("Start loading: '{}'", struct);
+
+ InputReader reader = InputReader.create(struct.input());
+ List<InputReader> readerList = reader.multiReaders() ?
+ reader.split() :
+ ImmutableList.of(reader);
+ readers.addAll(readerList);
+
+ LOG.info("total {} found in '{}'", readerList.size(), struct);
+ tasks.ensureCapacity(tasks.size() + readerList.size());
+ int seq = 0;
+ for (InputReader r : readerList) {
+ if (curFile >= this.context.options().startFile &&
+ (this.context.options().endFile == -1 ||
+ curFile < this.context.options().endFile)) {
+ // Load data from current input mapping
+ tasks.add(new InputTaskItem(struct, r, seq, curIndex));
+ } else {
+ r.close();
+ }
+ seq += 1;
+ curFile += 1;
+ }
+ if (this.context.options().endFile != -1 &&
+ curFile >= this.context.options().endFile) {
+ break;
+ }
} catch (InitException e) {
throw new LoadException("Failed to init input reader", e);
+ } finally {
+ Set<InputReader> usedReaders = tasks.stream()
+ .map(item -> item.reader)
+
.collect(Collectors.toSet());
+ for (InputReader r : readers) {
+ if (!usedReaders.contains(r)) {
Review Comment:
⚠️ **Important: Reader 资源清理逻辑不完善**
在 `prepareTaskItems()` 的 finally 块中:
```java
} finally {
Set<InputReader> usedReaders = tasks.stream()
.map(item -> item.reader)
.collect(Collectors.toSet());
for (InputReader r : readers) {
if (!usedReaders.contains(r)) {
try {
r.close();
} catch (Exception ex) {
LOG.warn("Failed to close reader: {}", ex.getMessage());
}
}
}
}
```
问题:
1. 只关闭了 "未使用" 的 reader,但使用中的 reader 在哪里关闭?
2. 如果 `reader.init()` 失败,reader 可能处于半初始化状态,仍需清理
3. 异常被吞掉(只记录warn),可能隐藏重要的资源释放失败
**建议:**
- 明确 reader 的生命周期管理责任
- 使用 try-with-resources 或确保在任务完成后统一清理
- 考虑是否需要一个 Reader 注册表来跟踪所有创建的 reader
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]