This is an automated email from the ASF dual-hosted git repository.

chaow pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 89dba65   [To rel/0.12][IOTDB-1651]add reconnect to solve out of 
sequence (#4087)
89dba65 is described below

commit 89dba65c9824d2957737a062604e65205d778fa4
Author: yschengzi <[email protected]>
AuthorDate: Sat Oct 9 10:53:23 2021 +0800

     [To rel/0.12][IOTDB-1651]add reconnect to solve out of sequence (#4087)
---
 .../iotdb/db/sync/sender/transfer/SyncClient.java  | 41 ++++++++++++++++++++--
 1 file changed, 39 insertions(+), 2 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index b7f0540..6ff1143 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -95,7 +95,7 @@ public class SyncClient implements ISyncClient {
 
   private static final IoTDBConfig ioTDBConfig = 
IoTDBDescriptor.getInstance().getConfig();
 
-  private static final int TIMEOUT_MS = 1000;
+  private static final int TIMEOUT_MS = 2000;
 
   /**
    * When transferring schema information, it is a better choice to transfer 
only new schema
@@ -127,6 +127,8 @@ public class SyncClient implements ISyncClient {
   /** Record sync progress in log. */
   private ISyncSenderLogger syncLog;
 
+  private boolean isSyncConnect = false;
+
   private ISyncFileManager syncFileManager = SyncFileManager.getInstance();
 
   private ScheduledExecutorService executorService;
@@ -222,6 +224,11 @@ public class SyncClient implements ISyncClient {
             syncAll();
           } catch (Exception e) {
             logger.error("Sync failed", e);
+          } finally {
+            if (transport != null && transport.isOpen()) {
+              transport.close();
+            }
+            isSyncConnect = false;
           }
         },
         SyncConstant.SYNC_PROCESS_DELAY,
@@ -311,12 +318,28 @@ public class SyncClient implements ISyncClient {
       if (!transport.isOpen()) {
         transport.open();
       }
+
+      isSyncConnect = true;
     } catch (TTransportException e) {
       logger.error("Cannot connect to the receiver.");
       throw new SyncConnectionException(e);
     }
   }
 
+  private boolean reconnect() {
+    if (transport != null && transport.isOpen()) {
+      transport.close();
+    }
+
+    try {
+      establishConnection(config.getServerIp(), config.getServerPort());
+    } catch (SyncConnectionException e) {
+      logger.warn("Can not reconnect to receiver {}. Caused by ", 
config.getSyncReceiverName(), e);
+      return false;
+    }
+    return true;
+  }
+
   @Override
   public void confirmIdentity() throws SyncConnectionException {
     try (Socket socket = new Socket(config.getServerIp(), 
config.getServerPort())) {
@@ -380,6 +403,10 @@ public class SyncClient implements ISyncClient {
             String.format(
                 "Can not sync schema after %s retries.", 
config.getMaxNumOfSyncFileRetry()));
       }
+      if (!isSyncConnect && !reconnect()) {
+        retryCount++;
+        continue;
+      }
       if (tryToSyncSchema()) {
         writeSyncSchemaPos(getSchemaPosFile());
         break;
@@ -417,7 +444,13 @@ public class SyncClient implements ISyncClient {
 
       // check digest
       return checkDigestForSchema(new BigInteger(1, md.digest()).toString(16));
-    } catch (NoSuchAlgorithmException | IOException | TException e) {
+    } catch (TException e) {
+      logger.error(
+          "Can not finish transfer schema to receiver, thrift error happen {}, 
try to reconnect",
+          e);
+      isSyncConnect = false;
+      return false;
+    } catch (NoSuchAlgorithmException | IOException e) {
       logger.error("Can not finish transfer schema to receiver", e);
       return false;
     }
@@ -550,6 +583,9 @@ public class SyncClient implements ISyncClient {
     logger.info("Start to sync names of deleted files in storage group {}", 
sgName);
     for (File file : deletedFilesName) {
       try {
+        if (!isSyncConnect && !reconnect()) {
+          continue;
+        }
         if (serviceClient.syncDeletedFileName(getFileNameWithSG(file)).code == 
SUCCESS_CODE) {
           logger.info(
               "Receiver has received deleted file name {} successfully.", 
getFileNameWithSG(file));
@@ -558,6 +594,7 @@ public class SyncClient implements ISyncClient {
         }
       } catch (TException e) {
         logger.error("Can not sync deleted file name {}, skip it.", file);
+        isSyncConnect = false;
       }
     }
     logger.info("Finish to sync names of deleted files in storage group {}", 
sgName);

Reply via email to