imbajin commented on code in PR #683:
URL:
https://github.com/apache/incubator-hugegraph-toolchain/pull/683#discussion_r2458901813
##########
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java:
##########
@@ -200,25 +592,150 @@ private void loadInputs(List<InputStruct> structs) {
}
}
- private void loadStructs(List<InputStruct> structs) {
- // Load input structs one by one
+ private List<InputTaskItem> prepareTaskItems(List<InputStruct> structs,
+ boolean scatter) {
+ ArrayList<InputTaskItem> tasks = new ArrayList<>();
+ ArrayList<InputReader> readers = new ArrayList<>();
+ int curFile = 0;
+ int curIndex = 0;
for (InputStruct struct : structs) {
- if (this.context.stopped()) {
- break;
- }
if (struct.skip()) {
continue;
}
- // Create and init InputReader, fetch next batch lines
- try (InputReader reader = InputReader.create(struct.input())) {
- // Init reader
- reader.init(this.context, struct);
- // Load data from current input mapping
- this.loadStruct(struct, reader);
+
+ // Create and init InputReader
+ try {
+ LOG.info("Start loading: '{}'", struct);
+
+ InputReader reader = InputReader.create(struct.input());
+ List<InputReader> readerList = reader.multiReaders() ?
+ reader.split() :
+ ImmutableList.of(reader);
+ readers.addAll(readerList);
+
+ LOG.info("total {} found in '{}'", readerList.size(), struct);
+ tasks.ensureCapacity(tasks.size() + readerList.size());
+ int seq = 0;
+ for (InputReader r : readerList) {
+ if (curFile >= this.context.options().startFile &&
+ (this.context.options().endFile == -1 ||
+ curFile < this.context.options().endFile)) {
+ // Load data from current input mapping
+ tasks.add(new InputTaskItem(struct, r, seq, curIndex));
+ } else {
+ r.close();
+ }
+ seq += 1;
+ curFile += 1;
+ }
+ if (this.context.options().endFile != -1 &&
+ curFile >= this.context.options().endFile) {
+ break;
+ }
} catch (InitException e) {
throw new LoadException("Failed to init input reader", e);
+ } finally {
+ Set<InputReader> usedReaders = tasks.stream()
+ .map(item -> item.reader)
+
.collect(Collectors.toSet());
+ for (InputReader r : readers) {
+ if (!usedReaders.contains(r)) {
+ try {
+ r.close();
+ } catch (Exception ex) {
+ LOG.warn("Failed to close reader: {}",
ex.getMessage());
+ }
+ }
+ }
}
+ curIndex += 1;
+ }
+ // sort by seqNumber to allow scatter loading from different sources
+ if (scatter) {
+ tasks.sort(Comparator.comparingInt((InputTaskItem o) ->
o.structIndex)
+ .thenComparingInt(o -> o.seqNumber));
}
+
+ return tasks;
+ }
+
+ private void loadStructs(List<InputStruct> structs) {
+ int parallelCount = this.context.options().parallelCount;
+ if (structs.size() == 0) {
+ return;
+ }
+ if (parallelCount <= 0) {
Review Comment:
‼️ **线程池未关闭**: 在 `loadStructs` 中创建的 `loadService` ExecutorService 从未被显式关闭。
**问题**: 即使任务完成或发生异常,线程池也不会自动关闭,导致线程泄漏和 JVM 无法正常退出。
**建议**: 使用 try-finally 确保线程池被正确关闭:
```java
ExecutorService loadService = ExecutorUtil.newFixedThreadPool(parallelCount,
"loader");
try {
// ... all loading logic
} finally {
loadService.shutdown();
try {
if (!loadService.awaitTermination(60, TimeUnit.SECONDS)) {
loadService.shutdownNow();
}
} catch (InterruptedException e) {
loadService.shutdownNow();
Thread.currentThread().interrupt();
}
}
```
##########
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java:
##########
@@ -200,25 +592,150 @@ private void loadInputs(List<InputStruct> structs) {
}
}
- private void loadStructs(List<InputStruct> structs) {
- // Load input structs one by one
+ private List<InputTaskItem> prepareTaskItems(List<InputStruct> structs,
+ boolean scatter) {
+ ArrayList<InputTaskItem> tasks = new ArrayList<>();
+ ArrayList<InputReader> readers = new ArrayList<>();
+ int curFile = 0;
+ int curIndex = 0;
for (InputStruct struct : structs) {
- if (this.context.stopped()) {
- break;
- }
if (struct.skip()) {
continue;
}
- // Create and init InputReader, fetch next batch lines
- try (InputReader reader = InputReader.create(struct.input())) {
- // Init reader
- reader.init(this.context, struct);
- // Load data from current input mapping
- this.loadStruct(struct, reader);
+
+ // Create and init InputReader
+ try {
+ LOG.info("Start loading: '{}'", struct);
+
+ InputReader reader = InputReader.create(struct.input());
+ List<InputReader> readerList = reader.multiReaders() ?
+ reader.split() :
+ ImmutableList.of(reader);
+ readers.addAll(readerList);
+
+ LOG.info("total {} found in '{}'", readerList.size(), struct);
+ tasks.ensureCapacity(tasks.size() + readerList.size());
+ int seq = 0;
+ for (InputReader r : readerList) {
+ if (curFile >= this.context.options().startFile &&
+ (this.context.options().endFile == -1 ||
+ curFile < this.context.options().endFile)) {
+ // Load data from current input mapping
+ tasks.add(new InputTaskItem(struct, r, seq, curIndex));
+ } else {
+ r.close();
+ }
+ seq += 1;
+ curFile += 1;
+ }
+ if (this.context.options().endFile != -1 &&
+ curFile >= this.context.options().endFile) {
+ break;
+ }
} catch (InitException e) {
throw new LoadException("Failed to init input reader", e);
+ } finally {
+ Set<InputReader> usedReaders = tasks.stream()
+ .map(item -> item.reader)
+
.collect(Collectors.toSet());
+ for (InputReader r : readers) {
+ if (!usedReaders.contains(r)) {
+ try {
+ r.close();
+ } catch (Exception ex) {
+ LOG.warn("Failed to close reader: {}",
ex.getMessage());
+ }
+ }
+ }
}
+ curIndex += 1;
+ }
+ // sort by seqNumber to allow scatter loading from different sources
+ if (scatter) {
+ tasks.sort(Comparator.comparingInt((InputTaskItem o) ->
o.structIndex)
+ .thenComparingInt(o -> o.seqNumber));
}
+
+ return tasks;
+ }
+
+ private void loadStructs(List<InputStruct> structs) {
+ int parallelCount = this.context.options().parallelCount;
+ if (structs.size() == 0) {
+ return;
+ }
+ if (parallelCount <= 0) {
+ parallelCount = Math.min(structs.size(),
Runtime.getRuntime().availableProcessors() * 2);
+ }
+
+ boolean scatter = this.context.options().scatterSources;
+
+ LOG.info("{} threads for loading {} structs, from {} to {} in {} mode",
+ parallelCount, structs.size(),
this.context.options().startFile,
+ this.context.options().endFile,
+ scatter ? "scatter" : "sequential");
+
+ ExecutorService loadService =
ExecutorUtil.newFixedThreadPool(parallelCount,
+
"loader");
+
+ List<InputTaskItem> taskItems = prepareTaskItems(structs, scatter);
+ List<CompletableFuture<Void>> loadTasks = new ArrayList<>();
+
+ if (taskItems.isEmpty()) {
+ LOG.info("No tasks to execute after filtering");
+ return;
+ }
+
+ for (InputTaskItem item : taskItems) {
+ // Init reader
+ item.reader.init(this.context, item.struct);
+ // Load data from current input mapping
+ loadTasks.add(
+ this.asyncLoadStruct(item.struct, item.reader,
+ loadService));
+ }
+
+ LOG.info("waiting for loading finish {}", loadTasks.size());
+ // wait for finish
+ try {
Review Comment:
⚠️ **并发安全问题**: `InputProgress` 的并发访问可能存在竞态条件。多个线程同时调用 `markLoaded` 时,内部的 Map
操作需要保证线程安全。
**建议**: 检查 `InputProgress` 实现是否使用了 `ConcurrentHashMap` 或适当的同步机制。如果使用普通
HashMap,需要改为:
```java
private final Map<String, InputItemProgress> progressMap = new
ConcurrentHashMap<>();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]