This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-hugegraph-computer.git
The following commit(s) were added to refs/heads/master by this push:
new f7d10a84 feat: implement fast-failover for MessageRecvManager and
DataClientManager (#243)
f7d10a84 is described below
commit f7d10a845de68cd84fe01f247b8fc3d887edbb67
Author: Aaron Wang <[email protected]>
AuthorDate: Thu May 25 09:24:52 2023 +0800
feat: implement fast-failover for MessageRecvManager and DataClientManager
(#243)
---
.../computer/core/network/DataClientManager.java | 3 +-
.../netty/ChannelFutureListenerOnWrite.java | 2 +-
.../computer/core/receiver/MessageRecvManager.java | 59 ++++++++++------------
.../computer/core/sender/MessageSendManager.java | 6 +--
.../computer/core/sender/MessageSender.java | 8 +++
.../computer/core/sender/QueuedMessageSender.java | 10 ++++
.../computer/core/compute/MockMessageSender.java | 7 +++
7 files changed, 58 insertions(+), 37 deletions(-)
diff --git
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/DataClientManager.java
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/DataClientManager.java
index 34f7043a..8e90c794 100644
---
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/DataClientManager.java
+++
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/DataClientManager.java
@@ -118,10 +118,9 @@ public class DataClientManager implements Manager {
@Override
public void exceptionCaught(TransportException cause,
ConnectionId connectionId) {
- // TODO: implement failover
LOG.error("Channel for connectionId {} occurred exception",
connectionId, cause);
- DataClientManager.this.connManager.closeClient(connectionId);
+ DataClientManager.this.sender.transportExceptionCaught(cause,
connectionId);
}
}
}
diff --git
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/netty/ChannelFutureListenerOnWrite.java
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/netty/ChannelFutureListenerOnWrite.java
index c3715574..9e54a169 100644
---
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/netty/ChannelFutureListenerOnWrite.java
+++
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/netty/ChannelFutureListenerOnWrite.java
@@ -55,7 +55,7 @@ public class ChannelFutureListenerOnWrite implements
ChannelFutureListener {
}
}
- public void onSuccess(Channel channel, ChannelFuture future) {
+ public void onSuccess(Channel channel, ChannelFuture future) {
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully send data to '{}'",
TransportUtil.remoteAddress(channel));
diff --git
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java
index 5e51320a..b77ffa80 100644
---
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java
+++
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java
@@ -18,8 +18,11 @@
package org.apache.hugegraph.computer.core.receiver;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.common.Constants;
@@ -60,7 +63,9 @@ public class MessageRecvManager implements Manager,
MessageHandler {
private int workerCount;
private int expectedFinishMessages;
- private CountDownLatch finishMessagesLatch;
+ private CompletableFuture<Void> finishMessagesFuture;
+ private AtomicInteger finishMessagesCount;
+
private long waitFinishMessagesTimeout;
private long superstep;
@@ -71,6 +76,7 @@ public class MessageRecvManager implements Manager,
MessageHandler {
this.fileManager = fileManager;
this.sortManager = sortManager;
this.superstep = Constants.INPUT_SUPERSTEP;
+ this.finishMessagesCount = new AtomicInteger();
}
@Override
@@ -90,8 +96,9 @@ public class MessageRecvManager implements Manager,
MessageHandler {
this.workerCount = config.get(ComputerOptions.JOB_WORKERS_COUNT);
// One for vertex and one for edge.
this.expectedFinishMessages = this.workerCount * 2;
- this.finishMessagesLatch = new CountDownLatch(
- this.expectedFinishMessages);
+ this.finishMessagesFuture = new CompletableFuture<>();
+ this.finishMessagesCount.set(this.expectedFinishMessages);
+
this.waitFinishMessagesTimeout = config.get(
ComputerOptions.WORKER_WAIT_FINISH_MESSAGES_TIMEOUT);
}
@@ -103,8 +110,9 @@ public class MessageRecvManager implements Manager,
MessageHandler {
this.messagePartitions = new ComputeMessageRecvPartitions(
this.context, fileGenerator,
this.sortManager);
this.expectedFinishMessages = this.workerCount;
- this.finishMessagesLatch = new CountDownLatch(
- this.expectedFinishMessages);
+ this.finishMessagesFuture = new CompletableFuture<>();
+ this.finishMessagesCount.set(this.expectedFinishMessages);
+
this.superstep = superstep;
if (this.superstep == Constants.INPUT_SUPERSTEP + 1) {
@@ -138,35 +146,21 @@ public class MessageRecvManager implements Manager,
MessageHandler {
@Override
public void exceptionCaught(TransportException cause,
ConnectionId connectionId) {
- // TODO: implement failover
- LOG.warn("Exception caught for connection:{}, root cause:",
+ LOG.error("Exception caught for connection:{}, root cause:",
connectionId, cause);
+ this.finishMessagesFuture.completeExceptionally(cause);
}
public void waitReceivedAllMessages() {
try {
- boolean status = this.finishMessagesLatch.await(
- this.waitFinishMessagesTimeout,
- TimeUnit.MILLISECONDS);
- if (!status) {
- throw new ComputerException(
- "Expect %s finish-messages received in %s ms, " +
- "%s absence in superstep %s",
- this.expectedFinishMessages,
- this.waitFinishMessagesTimeout,
- this.finishMessagesLatch.getCount(),
- this.superstep);
- }
- } catch (InterruptedException e) {
- throw new ComputerException(
- "Thread is interrupted while waiting %s " +
- "finish-messages received in %s ms, " +
- "%s absence in superstep %s",
- e,
- this.expectedFinishMessages,
- this.waitFinishMessagesTimeout,
- this.finishMessagesLatch.getCount(),
- this.superstep);
+ this.finishMessagesFuture.get(this.waitFinishMessagesTimeout,
TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ throw new ComputerException("Time out while waiting %s
finish-messages " +
+ "received in %s ms in superstep %s",
+ this.expectedFinishMessages,
this.waitFinishMessagesTimeout, this.superstep, e);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new ComputerException("Error while waiting %s
finish-messages in superstep %s",
+ this.expectedFinishMessages, this.superstep, e);
}
}
@@ -214,7 +208,10 @@ public class MessageRecvManager implements Manager,
MessageHandler {
@Override
public void onFinished(ConnectionId connectionId) {
LOG.debug("ConnectionId {} finished", connectionId);
- this.finishMessagesLatch.countDown();
+ int currentCount = this.finishMessagesCount.decrementAndGet();
+ if (currentCount == 0) {
+ this.finishMessagesFuture.complete(null);
+ }
}
/**
diff --git
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSendManager.java
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSendManager.java
index ee981c02..48fa687f 100644
---
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSendManager.java
+++
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSendManager.java
@@ -154,7 +154,7 @@ public class MessageSendManager implements Manager {
}
/**
- * Finsih send message, send the last buffer and put an END signal
+ * Finish send message, send the last buffer and put an END signal
* into queue
* @param type the message type
*/
@@ -277,10 +277,10 @@ public class MessageSendManager implements Manager {
}
} catch (TimeoutException e) {
throw new ComputerException("Timeout(%sms) to wait for " +
- "controling message(%s) to finished",
+ "controlling message(%s) to finished",
e, timeout, type);
} catch (InterruptedException | ExecutionException e) {
- throw new ComputerException("Failed to wait for controling " +
+ throw new ComputerException("Failed to wait for controlling " +
"message(%s) to finished", e, type);
}
}
diff --git
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSender.java
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSender.java
index 864951e8..a700b22c 100644
---
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSender.java
+++
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSender.java
@@ -19,6 +19,8 @@ package org.apache.hugegraph.computer.core.sender;
import java.util.concurrent.CompletableFuture;
+import org.apache.hugegraph.computer.core.common.exception.TransportException;
+import org.apache.hugegraph.computer.core.network.ConnectionId;
import org.apache.hugegraph.computer.core.network.message.MessageType;
public interface MessageSender {
@@ -37,4 +39,10 @@ public interface MessageSender {
* @param message message payload
*/
void send(int workerId, QueuedMessage message) throws InterruptedException;
+
+ /**
+ * Invoked when the channel associated with the given connectionId has
+ * an exception is thrown processing message.
+ */
+ void transportExceptionCaught(TransportException cause, ConnectionId
connectionId);
}
diff --git
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java
index 810d3b9a..b2006b88 100644
---
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java
+++
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java
@@ -24,6 +24,7 @@ import
org.apache.hugegraph.computer.core.common.exception.ComputerException;
import org.apache.hugegraph.computer.core.common.exception.TransportException;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
+import org.apache.hugegraph.computer.core.network.ConnectionId;
import org.apache.hugegraph.computer.core.network.TransportClient;
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.concurrent.BarrierEvent;
@@ -103,6 +104,15 @@ public class QueuedMessageSender implements MessageSender {
channel.queue.put(message);
}
+ @Override
+ public void transportExceptionCaught(TransportException cause,
ConnectionId connectionId) {
+ for (WorkerChannel channel : this.channels) {
+ if (channel.client.connectionId().equals(connectionId)) {
+ channel.futureRef.get().completeExceptionally(cause);
+ }
+ }
+ }
+
public Runnable notBusyNotifier() {
/*
* DataClientHandler.sendAvailable() will call it when client
diff --git
a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/MockMessageSender.java
b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/MockMessageSender.java
index 3535fd8a..a1969200 100644
---
a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/MockMessageSender.java
+++
b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/MockMessageSender.java
@@ -19,6 +19,8 @@ package org.apache.hugegraph.computer.core.compute;
import java.util.concurrent.CompletableFuture;
+import org.apache.hugegraph.computer.core.common.exception.TransportException;
+import org.apache.hugegraph.computer.core.network.ConnectionId;
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.sender.MessageSender;
import org.apache.hugegraph.computer.core.sender.QueuedMessage;
@@ -36,4 +38,9 @@ public class MockMessageSender implements MessageSender {
public void send(int workerId, QueuedMessage message) {
// pass
}
+
+ @Override
+ public void transportExceptionCaught(TransportException cause,
ConnectionId connectionId) {
+ // pass
+ }
}