imbajin commented on code in PR #683:
URL:
https://github.com/apache/incubator-hugegraph-toolchain/pull/683#discussion_r2458901417
##########
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java:
##########
@@ -200,25 +592,150 @@ private void loadInputs(List<InputStruct> structs) {
}
}
- private void loadStructs(List<InputStruct> structs) {
- // Load input structs one by one
+ private List<InputTaskItem> prepareTaskItems(List<InputStruct> structs,
+ boolean scatter) {
+ ArrayList<InputTaskItem> tasks = new ArrayList<>();
+ ArrayList<InputReader> readers = new ArrayList<>();
+ int curFile = 0;
+ int curIndex = 0;
for (InputStruct struct : structs) {
- if (this.context.stopped()) {
- break;
- }
if (struct.skip()) {
continue;
}
- // Create and init InputReader, fetch next batch lines
- try (InputReader reader = InputReader.create(struct.input())) {
- // Init reader
- reader.init(this.context, struct);
- // Load data from current input mapping
- this.loadStruct(struct, reader);
+
+ // Create and init InputReader
+ try {
+ LOG.info("Start loading: '{}'", struct);
+
+ InputReader reader = InputReader.create(struct.input());
+ List<InputReader> readerList = reader.multiReaders() ?
+ reader.split() :
+ ImmutableList.of(reader);
+ readers.addAll(readerList);
+
+ LOG.info("total {} found in '{}'", readerList.size(), struct);
+ tasks.ensureCapacity(tasks.size() + readerList.size());
+ int seq = 0;
+ for (InputReader r : readerList) {
+ if (curFile >= this.context.options().startFile &&
+ (this.context.options().endFile == -1 ||
+ curFile < this.context.options().endFile)) {
+ // Load data from current input mapping
+ tasks.add(new InputTaskItem(struct, r, seq, curIndex));
+ } else {
+ r.close();
+ }
+ seq += 1;
+ curFile += 1;
+ }
Review Comment:
‼️ **资源泄漏风险**: 在 `prepareTaskItems` 方法中,当发生 `InitException` 异常时,已经创建的
`InputReader` 可能没有被正确关闭。
**问题**: 如果在循环中途抛出异常,之前成功创建的 reader 可能不会被关闭,导致资源泄漏。
**建议**: 使用 try-finally 确保异常时也能清理资源:
```java
List<InputReader> readers = new ArrayList<>();
try {
InputReader reader = InputReader.create(struct.input());
readers.add(reader);
List<InputReader> readerList = reader.multiReaders() ?
reader.split() :
ImmutableList.of(reader);
// ... rest of logic
} catch (InitException e) {
// Clean up any created readers
for (InputReader r : readers) {
try { r.close(); } catch (Exception ex) { /* log */ }
}
throw new LoadException("Failed to init input reader", e);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]