This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-hugegraph-computer.git


The following commit(s) were added to refs/heads/master by this push:
     new f7d10a84 feat: implement fast-failover for MessageRecvManager and 
DataClientManager (#243)
f7d10a84 is described below

commit f7d10a845de68cd84fe01f247b8fc3d887edbb67
Author: Aaron Wang <[email protected]>
AuthorDate: Thu May 25 09:24:52 2023 +0800

    feat: implement fast-failover for MessageRecvManager and DataClientManager 
(#243)
---
 .../computer/core/network/DataClientManager.java   |  3 +-
 .../netty/ChannelFutureListenerOnWrite.java        |  2 +-
 .../computer/core/receiver/MessageRecvManager.java | 59 ++++++++++------------
 .../computer/core/sender/MessageSendManager.java   |  6 +--
 .../computer/core/sender/MessageSender.java        |  8 +++
 .../computer/core/sender/QueuedMessageSender.java  | 10 ++++
 .../computer/core/compute/MockMessageSender.java   |  7 +++
 7 files changed, 58 insertions(+), 37 deletions(-)

diff --git 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/DataClientManager.java
 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/DataClientManager.java
index 34f7043a..8e90c794 100644
--- 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/DataClientManager.java
+++ 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/DataClientManager.java
@@ -118,10 +118,9 @@ public class DataClientManager implements Manager {
         @Override
         public void exceptionCaught(TransportException cause,
                                     ConnectionId connectionId) {
-            // TODO: implement failover
             LOG.error("Channel for connectionId {} occurred exception",
                       connectionId, cause);
-            DataClientManager.this.connManager.closeClient(connectionId);
+            DataClientManager.this.sender.transportExceptionCaught(cause, 
connectionId);
         }
     }
 }
diff --git 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/netty/ChannelFutureListenerOnWrite.java
 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/netty/ChannelFutureListenerOnWrite.java
index c3715574..9e54a169 100644
--- 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/netty/ChannelFutureListenerOnWrite.java
+++ 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/netty/ChannelFutureListenerOnWrite.java
@@ -55,7 +55,7 @@ public class ChannelFutureListenerOnWrite implements 
ChannelFutureListener {
         }
     }
 
-    public  void onSuccess(Channel channel, ChannelFuture future) {
+    public void onSuccess(Channel channel, ChannelFuture future) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Successfully send data to '{}'",
                       TransportUtil.remoteAddress(channel));
diff --git 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java
 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java
index 5e51320a..b77ffa80 100644
--- 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java
+++ 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java
@@ -18,8 +18,11 @@
 package org.apache.hugegraph.computer.core.receiver;
 
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hugegraph.computer.core.common.ComputerContext;
 import org.apache.hugegraph.computer.core.common.Constants;
@@ -60,7 +63,9 @@ public class MessageRecvManager implements Manager, 
MessageHandler {
 
     private int workerCount;
     private int expectedFinishMessages;
-    private CountDownLatch finishMessagesLatch;
+    private CompletableFuture<Void> finishMessagesFuture;
+    private AtomicInteger finishMessagesCount;
+
     private long waitFinishMessagesTimeout;
     private long superstep;
 
@@ -71,6 +76,7 @@ public class MessageRecvManager implements Manager, 
MessageHandler {
         this.fileManager = fileManager;
         this.sortManager = sortManager;
         this.superstep = Constants.INPUT_SUPERSTEP;
+        this.finishMessagesCount = new AtomicInteger();
     }
 
     @Override
@@ -90,8 +96,9 @@ public class MessageRecvManager implements Manager, 
MessageHandler {
         this.workerCount = config.get(ComputerOptions.JOB_WORKERS_COUNT);
         // One for vertex and one for edge.
         this.expectedFinishMessages = this.workerCount * 2;
-        this.finishMessagesLatch = new CountDownLatch(
-                                   this.expectedFinishMessages);
+        this.finishMessagesFuture = new CompletableFuture<>();
+        this.finishMessagesCount.set(this.expectedFinishMessages);
+
         this.waitFinishMessagesTimeout = config.get(
              ComputerOptions.WORKER_WAIT_FINISH_MESSAGES_TIMEOUT);
     }
@@ -103,8 +110,9 @@ public class MessageRecvManager implements Manager, 
MessageHandler {
         this.messagePartitions = new ComputeMessageRecvPartitions(
                                  this.context, fileGenerator, 
this.sortManager);
         this.expectedFinishMessages = this.workerCount;
-        this.finishMessagesLatch = new CountDownLatch(
-                                   this.expectedFinishMessages);
+        this.finishMessagesFuture = new CompletableFuture<>();
+        this.finishMessagesCount.set(this.expectedFinishMessages);
+
         this.superstep = superstep;
 
         if (this.superstep == Constants.INPUT_SUPERSTEP + 1) {
@@ -138,35 +146,21 @@ public class MessageRecvManager implements Manager, 
MessageHandler {
     @Override
     public void exceptionCaught(TransportException cause,
                                 ConnectionId connectionId) {
-        // TODO: implement failover
-        LOG.warn("Exception caught for connection:{}, root cause:",
+        LOG.error("Exception caught for connection:{}, root cause:",
                  connectionId, cause);
+        this.finishMessagesFuture.completeExceptionally(cause);
     }
 
     public void waitReceivedAllMessages() {
         try {
-            boolean status = this.finishMessagesLatch.await(
-                             this.waitFinishMessagesTimeout,
-                             TimeUnit.MILLISECONDS);
-            if (!status) {
-                throw new ComputerException(
-                          "Expect %s finish-messages received in %s ms, " +
-                          "%s absence in superstep %s",
-                          this.expectedFinishMessages,
-                          this.waitFinishMessagesTimeout,
-                          this.finishMessagesLatch.getCount(),
-                          this.superstep);
-            }
-        } catch (InterruptedException e) {
-            throw new ComputerException(
-                      "Thread is interrupted while waiting %s " +
-                      "finish-messages received in %s ms, " +
-                      "%s absence in superstep %s",
-                      e,
-                      this.expectedFinishMessages,
-                      this.waitFinishMessagesTimeout,
-                      this.finishMessagesLatch.getCount(),
-                      this.superstep);
+            this.finishMessagesFuture.get(this.waitFinishMessagesTimeout, 
TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+            throw new ComputerException("Time out while waiting %s 
finish-messages " +
+                    "received in %s ms in superstep %s",
+                    this.expectedFinishMessages, 
this.waitFinishMessagesTimeout, this.superstep, e);
+        } catch (InterruptedException | ExecutionException e) {
+            throw new ComputerException("Error while waiting %s 
finish-messages in superstep %s",
+                    this.expectedFinishMessages, this.superstep, e);
         }
     }
 
@@ -214,7 +208,10 @@ public class MessageRecvManager implements Manager, 
MessageHandler {
     @Override
     public void onFinished(ConnectionId connectionId) {
         LOG.debug("ConnectionId {} finished", connectionId);
-        this.finishMessagesLatch.countDown();
+        int currentCount = this.finishMessagesCount.decrementAndGet();
+        if (currentCount == 0) {
+            this.finishMessagesFuture.complete(null);
+        }
     }
 
     /**
diff --git 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSendManager.java
 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSendManager.java
index ee981c02..48fa687f 100644
--- 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSendManager.java
+++ 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSendManager.java
@@ -154,7 +154,7 @@ public class MessageSendManager implements Manager {
     }
 
     /**
-     * Finsih send message, send the last buffer and put an END signal
+     * Finish send message, send the last buffer and put an END signal
      * into queue
      * @param type the message type
      */
@@ -277,10 +277,10 @@ public class MessageSendManager implements Manager {
             }
         } catch (TimeoutException e) {
             throw new ComputerException("Timeout(%sms) to wait for " +
-                                        "controling message(%s) to finished",
+                                        "controlling message(%s) to finished",
                                         e, timeout, type);
         } catch (InterruptedException | ExecutionException e) {
-            throw new ComputerException("Failed to wait for controling " +
+            throw new ComputerException("Failed to wait for controlling " +
                                         "message(%s) to finished", e, type);
         }
     }
diff --git 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSender.java
 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSender.java
index 864951e8..a700b22c 100644
--- 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSender.java
+++ 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSender.java
@@ -19,6 +19,8 @@ package org.apache.hugegraph.computer.core.sender;
 
 import java.util.concurrent.CompletableFuture;
 
+import org.apache.hugegraph.computer.core.common.exception.TransportException;
+import org.apache.hugegraph.computer.core.network.ConnectionId;
 import org.apache.hugegraph.computer.core.network.message.MessageType;
 
 public interface MessageSender {
@@ -37,4 +39,10 @@ public interface MessageSender {
      * @param message message payload
      */
     void send(int workerId, QueuedMessage message) throws InterruptedException;
+
+    /**
+     * Invoked when the channel associated with the given connectionId has
+     * an exception is thrown processing message.
+     */
+    void transportExceptionCaught(TransportException cause, ConnectionId 
connectionId);
 }
diff --git 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java
 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java
index 810d3b9a..b2006b88 100644
--- 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java
+++ 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java
@@ -24,6 +24,7 @@ import 
org.apache.hugegraph.computer.core.common.exception.ComputerException;
 import org.apache.hugegraph.computer.core.common.exception.TransportException;
 import org.apache.hugegraph.computer.core.config.ComputerOptions;
 import org.apache.hugegraph.computer.core.config.Config;
+import org.apache.hugegraph.computer.core.network.ConnectionId;
 import org.apache.hugegraph.computer.core.network.TransportClient;
 import org.apache.hugegraph.computer.core.network.message.MessageType;
 import org.apache.hugegraph.concurrent.BarrierEvent;
@@ -103,6 +104,15 @@ public class QueuedMessageSender implements MessageSender {
         channel.queue.put(message);
     }
 
+    @Override
+    public void transportExceptionCaught(TransportException cause, 
ConnectionId connectionId) {
+        for (WorkerChannel channel : this.channels) {
+            if (channel.client.connectionId().equals(connectionId)) {
+                channel.futureRef.get().completeExceptionally(cause);
+            }
+        }
+    }
+
     public Runnable notBusyNotifier() {
         /*
          * DataClientHandler.sendAvailable() will call it when client
diff --git 
a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/MockMessageSender.java
 
b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/MockMessageSender.java
index 3535fd8a..a1969200 100644
--- 
a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/MockMessageSender.java
+++ 
b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/MockMessageSender.java
@@ -19,6 +19,8 @@ package org.apache.hugegraph.computer.core.compute;
 
 import java.util.concurrent.CompletableFuture;
 
+import org.apache.hugegraph.computer.core.common.exception.TransportException;
+import org.apache.hugegraph.computer.core.network.ConnectionId;
 import org.apache.hugegraph.computer.core.network.message.MessageType;
 import org.apache.hugegraph.computer.core.sender.MessageSender;
 import org.apache.hugegraph.computer.core.sender.QueuedMessage;
@@ -36,4 +38,9 @@ public class MockMessageSender implements MessageSender {
     public void send(int workerId, QueuedMessage message) {
         // pass
     }
+
+    @Override
+    public void transportExceptionCaught(TransportException cause, 
ConnectionId connectionId) {
+        // pass
+    }
 }

Reply via email to