javeme commented on code in PR #248:
URL:
https://github.com/apache/incubator-hugegraph-computer/pull/248#discussion_r1207966503
##########
computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java:
##########
@@ -61,35 +89,56 @@ public void init(Config config) {
@Override
public void close(Config config) {
- this.loadService.close();
this.sendManager.close(config);
+ this.sendExecutor.shutdown();
}
public void service(InputSplitRpcService rpcService) {
this.loadService.rpcService(rpcService);
}
/**
- * TODO: Load vertices and edges parallel.
* When this method finish, it means that all vertices and edges are sent,
* but there is no guarantee that all of them has been received.
*/
public void loadGraph() {
+ List<CompletableFuture<?>> futures = new ArrayList<>();
+
this.sendManager.startSend(MessageType.VERTEX);
- Iterator<Vertex> iterator =
this.loadService.createIteratorFromVertex();
- while (iterator.hasNext()) {
- Vertex vertex = iterator.next();
- this.sendManager.sendVertex(vertex);
+ for (int i = 0; i < this.sendThreadNum; i++) {
+ futures.add(send(this.sendManager::sendVertex,
+ this.loadService::createIteratorFromVertex));
}
+ CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0])).exceptionally(e -> {
+ throw new ComputerException("An exception occurred during parallel
" +
+ "sending vertices", e);
+ }).join();
this.sendManager.finishSend(MessageType.VERTEX);
+ futures.clear();
+
this.sendManager.startSend(MessageType.EDGE);
- iterator = this.loadService.createIteratorFromEdge();
- while (iterator.hasNext()) {
- Vertex vertex = iterator.next();
- this.sendManager.sendEdge(vertex);
+ for (int i = 0; i < this.sendThreadNum; i++) {
+ futures.add(send(this.sendManager::sendEdge,
+ this.loadService::createIteratorFromEdge));
Review Comment:
ditto
##########
computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java:
##########
@@ -61,35 +89,56 @@ public void init(Config config) {
@Override
public void close(Config config) {
- this.loadService.close();
this.sendManager.close(config);
+ this.sendExecutor.shutdown();
}
public void service(InputSplitRpcService rpcService) {
this.loadService.rpcService(rpcService);
}
/**
- * TODO: Load vertices and edges parallel.
* When this method finish, it means that all vertices and edges are sent,
* but there is no guarantee that all of them has been received.
*/
public void loadGraph() {
+ List<CompletableFuture<?>> futures = new ArrayList<>();
+
this.sendManager.startSend(MessageType.VERTEX);
- Iterator<Vertex> iterator =
this.loadService.createIteratorFromVertex();
- while (iterator.hasNext()) {
- Vertex vertex = iterator.next();
- this.sendManager.sendVertex(vertex);
+ for (int i = 0; i < this.sendThreadNum; i++) {
+ futures.add(send(this.sendManager::sendVertex,
+ this.loadService::createIteratorFromVertex));
Review Comment:
prefer to align with `this.sendManager`
##########
computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/load/LoadService.java:
##########
@@ -78,30 +67,38 @@ public void rpcService(InputSplitRpcService rpcService) {
}
public Iterator<Vertex> createIteratorFromVertex() {
- return new IteratorFromVertex();
+ return new IteratorFromVertex(this.config, this.rpcService);
}
public Iterator<Vertex> createIteratorFromEdge() {
- return new IteratorFromEdge();
+ return new IteratorFromEdge(this.config, this.rpcService);
}
- private class IteratorFromVertex implements Iterator<Vertex> {
+ private class IteratorFromVertex implements Iterator<Vertex>,
AutoCloseable {
private InputSplit currentSplit;
- public IteratorFromVertex() {
+ /*
+ * GraphFetcher include:
+ * VertexFetcher vertexFetcher;
+ * EdgeFetcher edgeFetcher;
+ */
+ private GraphFetcher fetcher;
+
+ public IteratorFromVertex(Config config, InputSplitRpcService
rpcService) {
Review Comment:
can we just pass into the `GraphFetcher fetcher` arg
##########
computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java:
##########
@@ -198,6 +198,14 @@ public static synchronized ComputerOptions instance() {
""
);
+ public static final ConfigOption<Integer> INPUT_SEND_THREAD_NUMS =
+ new ConfigOption<>(
+ "input.send_thread_nums",
+ "The number of threads for vertex and edge parallel
sending.",
Review Comment:
prefer `The number of threads for parallel sending vertex or edge.`
##########
computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/load/LoadService.java:
##########
@@ -78,30 +67,38 @@ public void rpcService(InputSplitRpcService rpcService) {
}
public Iterator<Vertex> createIteratorFromVertex() {
- return new IteratorFromVertex();
+ return new IteratorFromVertex(this.config, this.rpcService);
}
public Iterator<Vertex> createIteratorFromEdge() {
- return new IteratorFromEdge();
+ return new IteratorFromEdge(this.config, this.rpcService);
}
- private class IteratorFromVertex implements Iterator<Vertex> {
+ private class IteratorFromVertex implements Iterator<Vertex>,
AutoCloseable {
private InputSplit currentSplit;
- public IteratorFromVertex() {
+ /*
+ * GraphFetcher include:
+ * VertexFetcher vertexFetcher;
+ * EdgeFetcher edgeFetcher;
+ */
Review Comment:
also update the comment here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]