This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new dd98bf36b6 [MINOR] Federated retry on connection error
dd98bf36b6 is described below

commit dd98bf36b6040c8754b7dc8508c6f5d7268f2b45
Author: baunsgaard <[email protected]>
AuthorDate: Mon Jul 17 10:55:45 2023 +0200

    [MINOR] Federated retry on connection error
    
    This commit adds a small retry block for federated requests that
    fail due to connection issues. This in practice means if the request
    does not get through we retry the instruction first with 100 ms wait
    time up to 1sec after 5 retries.
    In general this made the tests for me run more smoothly, since we would
    not fail on workers not being started, but instead we would retry enough
    for the workers to start.
---
 .../controlprogram/federated/FederatedData.java    | 54 +++++++++++++++++-----
 1 file changed, 42 insertions(+), 12 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
index 0e0f837301..3b332d596f 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
@@ -20,6 +20,7 @@
 package org.apache.sysds.runtime.controlprogram.federated;
 
 import java.io.Serializable;
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -143,9 +144,8 @@ public class FederatedData {
                if(!_dataType.isMatrix() && !_dataType.isFrame())
                        throw new DMLRuntimeException("Federated datatype \"" + 
_dataType.toString() + "\" is not supported.");
                _varID = id;
-               FederatedRequest request = (mtd != null) ?
-                       new FederatedRequest(RequestType.READ_VAR, id, mtd) :
-                       new FederatedRequest(RequestType.READ_VAR, id);
+               FederatedRequest request = (mtd != null) ? new 
FederatedRequest(RequestType.READ_VAR, id,
+                       mtd) : new FederatedRequest(RequestType.READ_VAR, id);
                request.appendParam(_filepath);
                request.appendParam(_dataType.name());
                return executeFederatedOperation(request);
@@ -175,7 +175,20 @@ public class FederatedData {
         * @param request the requested operation
         * @return the response
         */
-       public synchronized static Future<FederatedResponse> 
executeFederatedOperation(InetSocketAddress address,
+       public static Future<FederatedResponse> 
executeFederatedOperation(InetSocketAddress address,
+               FederatedRequest... request) {
+               return executeFederatedOperation(address, 1, request);
+       }
+
+       /**
+        * Executes an federated operation on a federated worker.
+        *
+        * @param address socket address (incl host and port)
+        * @param retry   the retry count
+        * @param request the requested operation
+        * @return the response
+        */
+       public synchronized static Future<FederatedResponse> 
executeFederatedOperation(InetSocketAddress address, int retry,
                FederatedRequest... request) {
                try {
                        final Bootstrap b = new Bootstrap();
@@ -196,11 +209,28 @@ public class FederatedData {
                        return handler.getProm();
                }
                catch(Exception e) {
+                       if(e instanceof ConnectException) {
+
+                               if(retry < 5) {
+                                       try {
+                                               // Increasing retry timeout
+                                               Thread.sleep(200 * retry);
+                                       }
+                                       catch(Exception e2) {
+                                               throw new 
DMLRuntimeException(e);
+                                       }
+                                       return 
executeFederatedOperation(address, retry + 1, request);
+                               }
+                               else {
+                                       throw new DMLRuntimeException(e);
+                               }
+                       }
                        throw new DMLRuntimeException("Failed sending federated 
operation", e);
                }
        }
 
-       private static ChannelInitializer<SocketChannel> 
createChannel(InetSocketAddress address, DataRequestHandler handler){
+       private static ChannelInitializer<SocketChannel> 
createChannel(InetSocketAddress address,
+               DataRequestHandler handler) {
                final int timeout = ConfigurationManager.getFederatedTimeout();
                final boolean ssl = ConfigurationManager.isFederatedSSL();
 
@@ -240,9 +270,8 @@ public class FederatedData {
                }
        }
 
-       private static SslHandler createSSLHandler(SocketChannel ch, 
InetSocketAddress address){
-               return SslConstructor().context.newHandler(ch.alloc(), 
address.getAddress().getHostAddress(),
-                                                       address.getPort());
+       private static SslHandler createSSLHandler(SocketChannel ch, 
InetSocketAddress address) {
+               return SslConstructor().context.newHandler(ch.alloc(), 
address.getAddress().getHostAddress(), address.getPort());
        }
 
        public static void resetFederatedSites() {
@@ -313,19 +342,20 @@ public class FederatedData {
 
        public static class FederatedRequestEncoder extends ObjectEncoder {
                @Override
-               protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, 
Serializable msg,
-               boolean preferDirect) throws Exception {
+               protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, 
Serializable msg, boolean preferDirect)
+                       throws Exception {
                        int initCapacity = 256; // default initial capacity
                        if(msg instanceof FederatedRequest[]) {
                                initCapacity = 0;
                                try {
-                                       for(FederatedRequest fr : 
(FederatedRequest[])msg) {
+                                       for(FederatedRequest fr : 
(FederatedRequest[]) msg) {
                                                int frSize = 
Math.toIntExact(fr.estimateSerializationBufferSize());
                                                if(Integer.MAX_VALUE - 
initCapacity < frSize) // summed sizes exceed integer limits
                                                        throw new 
ArithmeticException("Overflow.");
                                                initCapacity += frSize;
                                        }
-                               } catch(ArithmeticException ae) { // size of 
federated request exceeds integer limits
+                               }
+                               catch(ArithmeticException ae) { // size of 
federated request exceeds integer limits
                                        initCapacity = Integer.MAX_VALUE;
                                }
                        }

Reply via email to