Radeity commented on code in PR #248:
URL:
https://github.com/apache/incubator-hugegraph-computer/pull/248#discussion_r1207693574
##########
computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java:
##########
@@ -17,35 +17,66 @@
package org.apache.hugegraph.computer.core.input;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
import org.apache.hugegraph.computer.core.common.ComputerContext;
+import org.apache.hugegraph.computer.core.common.exception.ComputerException;
+import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.graph.vertex.Vertex;
import org.apache.hugegraph.computer.core.manager.Manager;
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.rpc.InputSplitRpcService;
import org.apache.hugegraph.computer.core.sender.MessageSendManager;
import org.apache.hugegraph.computer.core.worker.load.LoadService;
+import org.apache.hugegraph.util.ExecutorUtil;
+import org.apache.hugegraph.util.Log;
+import org.slf4j.Logger;
public class WorkerInputManager implements Manager {
+ private static final Logger LOG = Log.logger(WorkerInputManager.class);
+ private static final String PREFIX = "input-send-executor-%s";
+
public static final String NAME = "worker_input";
/*
* Fetch vertices and edges from the data source and convert them
* to computer-vertices and computer-edges
*/
- private final LoadService loadService;
+ private final List<LoadService> loadServices;
+
+ private final ExecutorService sendExecutor;
+ private final int sendThreadNum;
+
/*
* Send vertex/edge or message to target worker
*/
private final MessageSendManager sendManager;
public WorkerInputManager(ComputerContext context,
MessageSendManager sendManager) {
- this.loadService = new LoadService(context);
this.sendManager = sendManager;
+
+ this.sendThreadNum = this.inputSendThreadNum(context.config());
+ this.sendExecutor =
ExecutorUtil.newFixedThreadPool(this.sendThreadNum, PREFIX);
+ LOG.info("Created parallel sending thread pool, thread num: {}",
+ sendThreadNum);
+
+ this.loadServices = new ArrayList<>(this.sendThreadNum);
+ for (int i = 0; i < this.sendThreadNum; i++) {
+ this.loadServices.add(new LoadService(context));
Review Comment:
Hi, @javeme , thanks for your comment, I move `fetcher` into
`IteratorFromVertex` and `IteratorFromEdge`, also replace creation of multi
`LoadService` as you suggested.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]