javeme commented on code in PR #248:
URL: 
https://github.com/apache/incubator-hugegraph-computer/pull/248#discussion_r1207978859


##########
computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/load/LoadService.java:
##########
@@ -78,30 +67,38 @@ public void rpcService(InputSplitRpcService rpcService) {
     }
 
     public Iterator<Vertex> createIteratorFromVertex() {
-        return new IteratorFromVertex();
+        return new IteratorFromVertex(this.config, this.rpcService);
     }
 
     public Iterator<Vertex> createIteratorFromEdge() {
-        return new IteratorFromEdge();
+        return new IteratorFromEdge(this.config, this.rpcService);
     }
 
-    private class IteratorFromVertex implements Iterator<Vertex> {
+    private class IteratorFromVertex implements Iterator<Vertex>, 
AutoCloseable {
 
         private InputSplit currentSplit;
 
-        public IteratorFromVertex() {
+        /*
+         * GraphFetcher include:
+         *   VertexFetcher vertexFetcher;
+         *   EdgeFetcher edgeFetcher;
+         */
+        private GraphFetcher fetcher;
+
+        public IteratorFromVertex(Config config, InputSplitRpcService 
rpcService) {

Review Comment:
   we can create GraphFetcher  when `new IteratorFromVertex()`



##########
computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java:
##########
@@ -61,35 +89,56 @@ public void init(Config config) {
 
     @Override
     public void close(Config config) {
-        this.loadService.close();
         this.sendManager.close(config);
+        this.sendExecutor.shutdown();
     }
 
     public void service(InputSplitRpcService rpcService) {
         this.loadService.rpcService(rpcService);
     }
 
     /**
-     * TODO: Load vertices and edges parallel.
      * When this method finish, it means that all vertices and edges are sent,
      * but there is no guarantee that all of them has been received.
      */
     public void loadGraph() {
+        List<CompletableFuture<?>> futures = new ArrayList<>();
+
         this.sendManager.startSend(MessageType.VERTEX);
-        Iterator<Vertex> iterator = 
this.loadService.createIteratorFromVertex();
-        while (iterator.hasNext()) {
-            Vertex vertex = iterator.next();
-            this.sendManager.sendVertex(vertex);
+        for (int i = 0; i < this.sendThreadNum; i++) {
+            futures.add(send(this.sendManager::sendVertex,
+                    this.loadService::createIteratorFromVertex));

Review Comment:
   ok, could you please paste the error?
   
   and we would prefer more lines/steps for clearer code, like this style:
   ```java
   future = send(this.sendManager::sendVertex,
                 this.loadService::createIteratorFromVertex)
   futures.add(future);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to