This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b0af35219e [IOTDB-3144] refactor sync recovery  (#5985)
b0af35219e is described below

commit b0af35219ee78b9b39cb0e889963994a28896d7f
Author: Chen YZ <[email protected]>
AuthorDate: Thu Jun 9 17:51:48 2022 +0800

    [IOTDB-3144] refactor sync recovery  (#5985)
---
 .../db/integration/sync/IoTDBSyncReceiverIT.java   |   2 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |   7 +-
 .../org/apache/iotdb/db/sync/sender/pipe/Pipe.java |   4 +
 .../iotdb/db/sync/sender/pipe/TsFilePipe.java      |  12 +
 .../iotdb/db/sync/sender/service/MsgManager.java   |   2 +-
 .../db/sync/sender/service/SenderService.java      |   5 +
 .../db/sync/sender/service/TransportHandler.java   |  12 +-
 .../db/sync/transport/client/ClientWrapper.java    | 125 ++++++++++
 .../db/sync/transport/client/ITransportClient.java |   2 +
 .../db/sync/transport/client/TransportClient.java  | 258 ++++++++++++---------
 .../transport/server/TransportServiceImpl.java     |  29 ++-
 11 files changed, 336 insertions(+), 122 deletions(-)

diff --git 
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
 
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
index 7f296c5263..932441c02a 100644
--- 
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
+++ 
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
@@ -107,8 +107,8 @@ public class IoTDBSyncReceiverIT {
       Assert.fail("Failed to start pipe server because " + e.getMessage());
     }
     Pipe pipe = new TsFilePipe(createdTime1, pipeName1, null, 0, false);
-    client = new TransportClient(pipe, "127.0.0.1", 6670, "127.0.0.1");
     remoteIp1 = "127.0.0.1";
+    client = new TransportClient(pipe, remoteIp1, 6670, "127.0.0.1");
     client.handshake();
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java 
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 28bd2ff8ae..d9b93e77ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -1335,7 +1335,12 @@ public class PlanExecutor implements IPlanExecutor {
         record.addField(Binary.valueOf(pipe.getName()), TSDataType.TEXT);
         record.addField(Binary.valueOf(IoTDBConstant.SYNC_SENDER_ROLE), 
TSDataType.TEXT);
         record.addField(Binary.valueOf(pipe.getPipeSink().getPipeSinkName()), 
TSDataType.TEXT);
-        record.addField(Binary.valueOf(pipe.getStatus().name()), 
TSDataType.TEXT);
+        if (pipe.getStatus().equals(Pipe.PipeStatus.RUNNING) && 
pipe.isDisconnected()) {
+          record.addField(
+              Binary.valueOf(pipe.getStatus().name() + "(DISCONNECTED)"), 
TSDataType.TEXT);
+        } else {
+          record.addField(Binary.valueOf(pipe.getStatus().name()), 
TSDataType.TEXT);
+        }
         record.addField(
             Binary.valueOf(SenderService.getInstance().getPipeMsg(pipe)), 
TSDataType.TEXT);
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
index 94aae7b40b..92f6e7575e 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
@@ -107,6 +107,10 @@ public interface Pipe {
    */
   void commit();
 
+  void setDisconnected(boolean disconnected);
+
+  boolean isDisconnected();
+
   // a new pipe should be stop status
   enum PipeStatus {
     RUNNING,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
index 802c1a5ded..7a1417d034 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
@@ -62,6 +62,7 @@ public class TsFilePipe implements Pipe {
 
   private boolean isCollectingRealTimeData;
   private long maxSerialNumber;
+  private boolean disconnected; // true if pipe cannot connect to receiver
 
   private PipeStatus status;
 
@@ -84,6 +85,7 @@ public class TsFilePipe implements Pipe {
     this.maxSerialNumber = Math.max(0L, 
realTimeQueue.getLastMaxSerialNumber());
 
     this.status = PipeStatus.STOP;
+    this.disconnected = false;
   }
 
   @Override
@@ -275,6 +277,16 @@ public class TsFilePipe implements Pipe {
     realTimeQueue.commit();
   }
 
+  @Override
+  public void setDisconnected(boolean disconnected) {
+    this.disconnected = disconnected;
+  }
+
+  @Override
+  public boolean isDisconnected() {
+    return disconnected;
+  }
+
   public void commit(long serialNumber) {
     if (!historyQueue.isEmpty()) {
       historyQueue.commit(serialNumber);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/MsgManager.java 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/MsgManager.java
index 4853dec5d1..2ddf86f23f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/MsgManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/MsgManager.java
@@ -82,7 +82,7 @@ public class MsgManager {
   public synchronized String getPipeMsg(Pipe pipe) {
     if (runningPipe == null) {
       return "";
-    } else if (!pipe.equals(runningPipe) || Messages.size() == 0) {
+    } else if (!pipe.equals(runningPipe)) {
       return "";
     }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
index ec5a511507..9faa79876d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
@@ -247,6 +247,7 @@ public class SenderService implements IService {
     checkRunningPipeExistAndName(pipeName);
     if (runningPipe.getStatus() == Pipe.PipeStatus.STOP) {
       if (runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
+        sendMsg(RequestType.START);
         runningPipe.start();
         transportHandler.start();
       } else { // for external PIPE
@@ -352,6 +353,10 @@ public class SenderService implements IService {
     }
   }
 
+  public void setConnecting(boolean isConnecting) {
+    runningPipe.setDisconnected(isConnecting);
+  }
+
   /** transport */
   private void sendMsg(RequestType type) throws PipeException {
     try {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
index 720557f235..010ea39ca6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
@@ -53,6 +53,7 @@ public class TransportHandler {
   private long createTime;
   private final String localIP;
   protected ITransportClient transportClient;
+  private final Pipe pipe;
 
   protected ExecutorService transportExecutorService;
   private Future transportFuture;
@@ -61,10 +62,9 @@ public class TransportHandler {
   private Future heartbeatFuture;
 
   public TransportHandler(Pipe pipe, IoTDBPipeSink pipeSink) {
+    this.pipe = pipe;
     this.pipeName = pipe.getName();
     this.createTime = pipe.getCreateTime();
-    this.localIP = getLocalIP(pipeSink);
-    this.transportClient = new TransportClient(pipe, pipeSink.getIp(), 
pipeSink.getPort(), localIP);
 
     this.transportExecutorService =
         IoTDBThreadPoolFactory.newSingleThreadExecutor(
@@ -72,6 +72,9 @@ public class TransportHandler {
     this.heartbeatExecutorService =
         IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
             ThreadName.SYNC_SENDER_HEARTBEAT.getName() + "-" + pipeName);
+
+    this.localIP = getLocalIP(pipeSink);
+    this.transportClient = new TransportClient(pipe, pipeSink.getIp(), 
pipeSink.getPort(), localIP);
   }
 
   private String getLocalIP(IoTDBPipeSink pipeSink) {
@@ -136,7 +139,12 @@ public class TransportHandler {
           .receiveMsg(
               transportClient.heartbeat(
                   new SyncRequest(RequestType.HEARTBEAT, pipeName, localIP, 
createTime)));
+      synchronized (((TransportClient) transportClient).getWaitLock()) {
+        pipe.setDisconnected(false);
+        ((TransportClient) transportClient).getWaitLock().notifyAll();
+      }
     } catch (SyncConnectionException e) {
+      pipe.setDisconnected(true);
       logger.warn(
           String.format(
               "Pipe %s sends heartbeat to receiver error, skip this time, 
because %s.",
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ClientWrapper.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ClientWrapper.java
new file mode 100644
index 0000000000..ba8ceaaf3d
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ClientWrapper.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.transport.client;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.SyncConnectionException;
+import org.apache.iotdb.db.sync.conf.SyncConstant;
+import org.apache.iotdb.db.sync.sender.pipe.Pipe;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.service.transport.thrift.IdentityInfo;
+import org.apache.iotdb.service.transport.thrift.TransportService;
+import org.apache.iotdb.service.transport.thrift.TransportStatus;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.iotdb.db.sync.transport.conf.TransportConstant.SUCCESS_CODE;
+
+public class ClientWrapper {
+  private static final Logger logger = 
LoggerFactory.getLogger(ClientWrapper.class);
+  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+
+  private TTransport transport = null;
+  private volatile TransportService.Client serviceClient = null;
+
+  /* remote IP address*/
+  private final String ipAddress;
+  /* remote port */
+  private final int port;
+  /* local IP address*/
+  private final String localIP;
+
+  private final Pipe pipe;
+
+  public ClientWrapper(Pipe pipe, String ipAddress, int port, String localIP) {
+    this.pipe = pipe;
+    this.ipAddress = ipAddress;
+    this.port = port;
+    this.localIP = localIP;
+  }
+
+  public TransportService.Client getClient() {
+    return serviceClient;
+  }
+
+  /**
+   * create connection and handshake before sending messages
+   *
+   * @return true if success; false if failed to check IoTDB version.
+   * @throws SyncConnectionException cannot create connection to receiver
+   */
+  public boolean handshakeWithVersion() throws SyncConnectionException {
+    if (transport != null && transport.isOpen()) {
+      transport.close();
+    }
+
+    try {
+      transport =
+          RpcTransportFactory.INSTANCE.getTransport(
+              new TSocket(
+                  TConfigurationConst.defaultTConfiguration,
+                  ipAddress,
+                  port,
+                  SyncConstant.SOCKET_TIMEOUT_MILLISECONDS,
+                  SyncConstant.CONNECT_TIMEOUT_MILLISECONDS));
+      TProtocol protocol;
+      if (config.isRpcThriftCompressionEnable()) {
+        protocol = new TCompactProtocol(transport);
+      } else {
+        protocol = new TBinaryProtocol(transport);
+      }
+      serviceClient = new TransportService.Client(protocol);
+
+      // Underlay socket open.
+      if (!transport.isOpen()) {
+        transport.open();
+      }
+
+      IdentityInfo identityInfo =
+          new IdentityInfo(
+              localIP, pipe.getName(), pipe.getCreateTime(), 
config.getIoTDBMajorVersion());
+      TransportStatus status = serviceClient.handshake(identityInfo);
+      if (status.code != SUCCESS_CODE) {
+        logger.error("The receiver rejected the synchronization task because 
{}", status.msg);
+        return false;
+      }
+    } catch (TException e) {
+      logger.warn("Cannot connect to the receiver because {}", e.getMessage());
+      throw new SyncConnectionException(
+          String.format("Cannot connect to the receiver because %s.", 
e.getMessage()));
+    }
+    return true;
+  }
+
+  public void close() {
+    if (transport != null) {
+      transport.close();
+      transport = null;
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ITransportClient.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ITransportClient.java
index 9893efa651..2a99570e7b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ITransportClient.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ITransportClient.java
@@ -24,5 +24,7 @@ import org.apache.iotdb.service.transport.thrift.SyncRequest;
 import org.apache.iotdb.service.transport.thrift.SyncResponse;
 
 public interface ITransportClient extends Runnable {
+
+  /** send control message */
   SyncResponse heartbeat(SyncRequest syncRequest) throws 
SyncConnectionException;
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java
index 7613f3f8ec..905a3fb1fc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.db.sync.sender.service.SenderService;
 import org.apache.iotdb.db.sync.transport.conf.TransportConstant;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.TConfigurationConst;
-import org.apache.iotdb.service.transport.thrift.IdentityInfo;
 import org.apache.iotdb.service.transport.thrift.MetaInfo;
 import org.apache.iotdb.service.transport.thrift.RequestType;
 import org.apache.iotdb.service.transport.thrift.ResponseType;
@@ -73,92 +72,69 @@ public class TransportClient implements ITransportClient {
 
   private static final int TRANSFER_BUFFER_SIZE_IN_BYTES = 1 * 1024 * 1024;
 
-  private TTransport transport = null;
+  private final ClientWrapper serviceClient;
+  private final ClientWrapper heartbeatClient;
 
-  private TransportService.Client serviceClient = null;
+  /* remote IP address*/
+  private final String ipAddress;
+  /* remote port */
+  private final int port;
+  /* local IP address*/
+  private final String localIP;
 
-  private String ipAddress;
-  private String localIP;
+  private final Pipe pipe;
 
-  private int port;
-
-  private IdentityInfo identityInfo = null;
-
-  private Pipe pipe;
+  /* hold this lock to wait until successfully reconnect to receiver */
+  private final Object waitLock;
 
+  /**
+   * @param pipe sync task
+   * @param ipAddress remote ip address
+   * @param port remote port
+   * @param localIP local ip address
+   */
   public TransportClient(Pipe pipe, String ipAddress, int port, String 
localIP) {
     RpcTransportFactory.setThriftMaxFrameSize(config.getThriftMaxFrameSize());
-
     this.pipe = pipe;
     this.ipAddress = ipAddress;
-    this.localIP = localIP;
     this.port = port;
+    this.waitLock = new Object();
+    this.localIP = localIP;
+    serviceClient = new ClientWrapper(pipe, ipAddress, port, localIP);
+    heartbeatClient = new ClientWrapper(pipe, ipAddress, port, localIP);
   }
 
-  public boolean handshake() {
-    int handshakeCounter = 0;
-    try {
-      while (!handshakeWithVersion()) {
-        handshakeCounter++;
-        if (handshakeCounter > config.getMaxNumberOfSyncFileRetry()) {
-          logger.error(
-              String.format(
-                  "Handshake failed %s times! Check network.",
-                  config.getMaxNumberOfSyncFileRetry()));
-          return false;
-        }
-        logger.info(
+  public Object getWaitLock() {
+    return waitLock;
+  }
+
+  /**
+   * Create thrift connection to receiver. (1) register pipe message, 
including pipeName, localIp
+   * and createTime (2) check IoTDB version to make sure compatibility
+   *
+   * @return true if success; false if failed MaxNumberOfSyncFileRetry times.
+   * @throws SyncConnectionException cannot create connection to receiver
+   */
+  public synchronized boolean handshake() throws SyncConnectionException {
+    for (int handshakeCounter = 0;
+        handshakeCounter < config.getMaxNumberOfSyncFileRetry();
+        handshakeCounter++) {
+      try {
+        return serviceClient.handshakeWithVersion();
+      } catch (SyncConnectionException e) {
+        logger.warn(
             String.format(
                 "Handshake error, retry %d/%d.",
                 handshakeCounter, config.getMaxNumberOfSyncFileRetry()));
       }
-    } catch (SyncConnectionException e) {
-      logger.error(String.format("Handshake failed and can not retry, because 
%s.", e), e);
-      return false;
     }
-    return true;
-  }
-
-  private boolean handshakeWithVersion() throws SyncConnectionException {
-    if (transport != null && transport.isOpen()) {
-      transport.close();
-    }
-
-    try {
-      transport =
-          RpcTransportFactory.INSTANCE.getTransport(
-              new TSocket(
-                  TConfigurationConst.defaultTConfiguration,
-                  ipAddress,
-                  port,
-                  SyncConstant.SOCKET_TIMEOUT_MILLISECONDS,
-                  SyncConstant.CONNECT_TIMEOUT_MILLISECONDS));
-      TProtocol protocol;
-      if (config.isRpcThriftCompressionEnable()) {
-        protocol = new TCompactProtocol(transport);
-      } else {
-        protocol = new TBinaryProtocol(transport);
-      }
-      serviceClient = new TransportService.Client(protocol);
-
-      // Underlay socket open.
-      if (!transport.isOpen()) {
-        transport.open();
-      }
-
-      identityInfo =
-          new IdentityInfo(
-              localIP, pipe.getName(), pipe.getCreateTime(), 
config.getIoTDBMajorVersion());
-      TransportStatus status = serviceClient.handshake(identityInfo);
-      if (status.code != SUCCESS_CODE) {
-        throw new SyncConnectionException(
-            "The receiver rejected the synchronization task because " + 
status.msg);
-      }
-    } catch (TException e) {
-      logger.warn("Cannot connect to the receiver. ", e);
+    if (!serviceClient.handshakeWithVersion()) {
+      logger.info(
+          String.format("Handshake failed %s times!", 
config.getMaxNumberOfSyncFileRetry()));
       return false;
+    } else {
+      return true;
     }
-    return true;
   }
 
   public boolean senderTransport(PipeData pipeData) throws 
SyncConnectionException {
@@ -193,7 +169,15 @@ public class TransportClient implements ITransportClient {
         break;
       } catch (SyncConnectionException e) {
         // handshake and retry
-        if (!handshake()) {
+        try {
+          if (!handshake()) {
+            logger.error(
+                String.format(
+                    "Handshake to receiver %s:%d error when transfer pipe data 
%s.",
+                    ipAddress, port, pipeData));
+            return false;
+          }
+        } catch (SyncConnectionException syncConnectionException) {
           logger.error(
               String.format(
                   "Reconnect to receiver %s:%d error when transfer pipe data 
%s.",
@@ -243,7 +227,14 @@ public class TransportClient implements ITransportClient {
         }
         break;
       } catch (SyncConnectionException e) {
-        if (!handshake()) {
+        // handshake and retry
+        try {
+          if (!handshake()) {
+            throw new SyncConnectionException(
+                String.format(
+                    "Handshake with receiver error when transferring file 
%s.", file.getName()));
+          }
+        } catch (SyncConnectionException syncConnectionException) {
           throw new SyncConnectionException(
               String.format(
                   "Connect to receiver error when transferring file %s.", 
file.getName()));
@@ -299,8 +290,9 @@ public class TransportClient implements ITransportClient {
             }
             try {
               status =
-                  serviceClient.transportData(
-                      metaInfo, buffToSend, 
ByteBuffer.wrap(messageDigest.digest()));
+                  serviceClient
+                      .getClient()
+                      .transportData(metaInfo, buffToSend, 
ByteBuffer.wrap(messageDigest.digest()));
             } catch (TException e) {
               // retry
               logger.error("TException happened! ", e);
@@ -378,7 +370,10 @@ public class TransportClient implements ITransportClient {
                 file.getAbsoluteFile(), config.getMaxNumberOfSyncFileRetry()));
       }
       try {
-        status = serviceClient.checkFileDigest(metaInfo, 
ByteBuffer.wrap(messageDigest.digest()));
+        status =
+            serviceClient
+                .getClient()
+                .checkFileDigest(metaInfo, 
ByteBuffer.wrap(messageDigest.digest()));
       } catch (TException e) {
         // retry
         logger.error("TException happens! ", e);
@@ -418,8 +413,9 @@ public class TransportClient implements ITransportClient {
         MetaInfo metaInfo =
             new MetaInfo(Type.findByValue(pipeData.getType().ordinal()), 
"fileName", 0);
         TransportStatus status =
-            serviceClient.transportData(
-                metaInfo, buffToSend, ByteBuffer.wrap(messageDigest.digest()));
+            serviceClient
+                .getClient()
+                .transportData(metaInfo, buffToSend, 
ByteBuffer.wrap(messageDigest.digest()));
 
         if (status.code == SUCCESS_CODE) {
           break;
@@ -446,42 +442,47 @@ public class TransportClient implements ITransportClient {
   @Override
   public void run() {
     try {
-      if (!handshake()) {
-        throw new SyncConnectionException(
-            String.format("Handshake with receiver %s:%d error.", ipAddress, 
port));
-      }
-      SenderService.getInstance()
-          .receiveMsg(
-              heartbeat(
-                  new SyncRequest(
-                      RequestType.START, pipe.getName(), localIP, 
pipe.getCreateTime())));
       while (!Thread.currentThread().isInterrupted()) {
-        PipeData pipeData = pipe.take();
-        if (!senderTransport(pipeData)) {
-          logger.error(String.format("Can not transfer pipedata %s, skip it.", 
pipeData));
-          // can do something.
+        try {
+          if (!handshake()) {
+            SenderService.getInstance()
+                .receiveMsg(
+                    new SyncResponse(
+                        ResponseType.ERROR,
+                        String.format("Can not handshake with %s:%d.", 
ipAddress, port)));
+          }
           SenderService.getInstance()
               .receiveMsg(
-                  new SyncResponse(
-                      ResponseType.WARN,
-                      String.format(
-                          "Transfer piepdata %s error, skip it.", 
pipeData.getSerialNumber())));
-          continue;
+                  heartbeat(
+                      new SyncRequest(
+                          RequestType.START, pipe.getName(), localIP, 
pipe.getCreateTime())));
+          while (!Thread.currentThread().isInterrupted()) {
+            PipeData pipeData = pipe.take();
+            if (!senderTransport(pipeData)) {
+              logger.error(String.format("Can not transfer pipedata %s, skip 
it.", pipeData));
+              // can do something.
+              SenderService.getInstance()
+                  .receiveMsg(
+                      new SyncResponse(
+                          ResponseType.WARN,
+                          String.format(
+                              "Transfer piepdata %s error, skip it.", 
pipeData.getSerialNumber())));
+              continue;
+            }
+            pipe.commit();
+          }
+        } catch (SyncConnectionException e) {
+          logger.error(
+              String.format("Connect to receiver %s:%d error, because %s.", 
ipAddress, port, e));
+          // wait and retry
+          synchronized (waitLock) {
+            SenderService.getInstance().setConnecting(true);
+            waitLock.wait();
+          }
         }
-        pipe.commit();
       }
     } catch (InterruptedException e) {
       logger.info("Interrupted by pipe, exit transport.");
-    } catch (SyncConnectionException e) {
-      logger.error(
-          String.format("Connect to receiver %s:%d error, because %s.", 
ipAddress, port, e));
-      SenderService.getInstance()
-          .receiveMsg(
-              new SyncResponse(
-                  ResponseType.ERROR,
-                  String.format(
-                      "Can not connect to %s:%d, please check receiver and 
Internet.",
-                      ipAddress, port)));
     } finally {
       close();
     }
@@ -489,6 +490,9 @@ public class TransportClient implements ITransportClient {
 
   @Override
   public SyncResponse heartbeat(SyncRequest syncRequest) throws 
SyncConnectionException {
+    if (syncRequest.getType().equals(RequestType.HEARTBEAT)) {
+      return requestHeartbeat(syncRequest);
+    }
     int retryCount = 0;
     while (true) {
       retryCount++;
@@ -528,9 +532,47 @@ public class TransportClient implements ITransportClient {
     }
   }
 
-  public void close() {
-    if (transport != null) {
-      transport.close();
+  /**
+   * deal with HEARTBEAT type request, use a special client to send request.
+   *
+   * @param syncRequest must be HEARTBEAT type request
+   * @throws SyncConnectionException cannot connect to receiver
+   */
+  private SyncResponse requestHeartbeat(SyncRequest syncRequest) throws 
SyncConnectionException {
+    if (heartbeatClient.getClient() == null) {
+      synchronized (heartbeatClient) {
+        if (heartbeatClient.getClient() == null) {
+          if (!heartbeatClient.handshakeWithVersion()) {
+            throw new SyncConnectionException("Handshake with receiver error 
when heartbeat.");
+          }
+        }
+      }
     }
+    int retryCount = 0;
+    while (true) {
+      retryCount++;
+      if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
+        throw new SyncConnectionException(
+            String.format(
+                "%s request connects to receiver %s:%d error.",
+                syncRequest.type.name(), ipAddress, port));
+      }
+      try {
+        return heartbeatClient.getClient().heartbeat(syncRequest);
+      } catch (TException e) {
+        if (!heartbeatClient.handshakeWithVersion()) {
+          throw new SyncConnectionException("Handshake with receiver error 
when heartbeat.");
+        }
+        logger.info(
+            String.format(
+                "Heartbeat connect to receiver %s:%d error, retry %d/%d.",
+                ipAddress, port, retryCount, 
config.getMaxNumberOfSyncFileRetry()));
+      }
+    }
+  }
+
+  public void close() {
+    serviceClient.close();
+    heartbeatClient.close();
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
index e8a748b77b..e52560db24 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
@@ -57,6 +57,8 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.text.DecimalFormat;
 import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.iotdb.db.sync.conf.SyncConstant.DATA_CHUNK_SIZE;
 import static 
org.apache.iotdb.db.sync.transport.conf.TransportConstant.CONFLICT_CODE;
@@ -71,10 +73,12 @@ public class TransportServiceImpl implements 
TransportService.Iface {
   private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static final String RECORD_SUFFIX = ".record";
   private static final String PATCH_SUFFIX = ".patch";
-  private ThreadLocal<IdentityInfo> identityInfoThreadLocal;
+  private final ThreadLocal<IdentityInfo> identityInfoThreadLocal;
+  private final Map<IdentityInfo, Integer> identityInfoCounter;
 
   public TransportServiceImpl() {
     identityInfoThreadLocal = new ThreadLocal<>();
+    identityInfoCounter = new ConcurrentHashMap<>();
   }
 
   private class CheckResult {
@@ -142,6 +146,7 @@ public class TransportServiceImpl implements 
TransportService.Iface {
   public TransportStatus handshake(IdentityInfo identityInfo) throws 
TException {
     logger.debug("Invoke handshake method from client ip = {}", 
identityInfo.address);
     identityInfoThreadLocal.set(identityInfo);
+    identityInfoCounter.compute(identityInfo, (k, v) -> v == null ? 1 : v + 1);
     // check ip address
     if (!verifyIPSegment(config.getIpWhiteList(), identityInfo.address)) {
       return new TransportStatus(
@@ -351,15 +356,21 @@ public class TransportServiceImpl implements 
TransportService.Iface {
     // Handle client exit here.
     IdentityInfo identityInfo = identityInfoThreadLocal.get();
     if (identityInfo != null) {
-      // stop pipe
+      // if all connections exit, stop pipe
       identityInfoThreadLocal.remove();
-      ReceiverService.getInstance()
-          .receiveMsg(
-              new SyncRequest(
-                  RequestType.STOP,
-                  identityInfo.getPipeName(),
-                  identityInfo.getAddress(),
-                  identityInfo.getCreateTime()));
+      synchronized (identityInfoCounter) {
+        identityInfoCounter.compute(identityInfo, (k, v) -> v == null ? 0 : v 
- 1);
+        if (identityInfoCounter.get(identityInfo) == 0) {
+          identityInfoCounter.remove(identityInfo);
+          ReceiverService.getInstance()
+              .receiveMsg(
+                  new SyncRequest(
+                      RequestType.STOP,
+                      identityInfo.getPipeName(),
+                      identityInfo.getAddress(),
+                      identityInfo.getCreateTime()));
+        }
+      }
     }
   }
 

Reply via email to