This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b0af35219e [IOTDB-3144] refactor sync recovery (#5985)
b0af35219e is described below
commit b0af35219ee78b9b39cb0e889963994a28896d7f
Author: Chen YZ <[email protected]>
AuthorDate: Thu Jun 9 17:51:48 2022 +0800
[IOTDB-3144] refactor sync recovery (#5985)
---
.../db/integration/sync/IoTDBSyncReceiverIT.java | 2 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 7 +-
.../org/apache/iotdb/db/sync/sender/pipe/Pipe.java | 4 +
.../iotdb/db/sync/sender/pipe/TsFilePipe.java | 12 +
.../iotdb/db/sync/sender/service/MsgManager.java | 2 +-
.../db/sync/sender/service/SenderService.java | 5 +
.../db/sync/sender/service/TransportHandler.java | 12 +-
.../db/sync/transport/client/ClientWrapper.java | 125 ++++++++++
.../db/sync/transport/client/ITransportClient.java | 2 +
.../db/sync/transport/client/TransportClient.java | 258 ++++++++++++---------
.../transport/server/TransportServiceImpl.java | 29 ++-
11 files changed, 336 insertions(+), 122 deletions(-)
diff --git
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
index 7f296c5263..932441c02a 100644
---
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
+++
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
@@ -107,8 +107,8 @@ public class IoTDBSyncReceiverIT {
Assert.fail("Failed to start pipe server because " + e.getMessage());
}
Pipe pipe = new TsFilePipe(createdTime1, pipeName1, null, 0, false);
- client = new TransportClient(pipe, "127.0.0.1", 6670, "127.0.0.1");
remoteIp1 = "127.0.0.1";
+ client = new TransportClient(pipe, remoteIp1, 6670, "127.0.0.1");
client.handshake();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 28bd2ff8ae..d9b93e77ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -1335,7 +1335,12 @@ public class PlanExecutor implements IPlanExecutor {
record.addField(Binary.valueOf(pipe.getName()), TSDataType.TEXT);
record.addField(Binary.valueOf(IoTDBConstant.SYNC_SENDER_ROLE),
TSDataType.TEXT);
record.addField(Binary.valueOf(pipe.getPipeSink().getPipeSinkName()),
TSDataType.TEXT);
- record.addField(Binary.valueOf(pipe.getStatus().name()),
TSDataType.TEXT);
+ if (pipe.getStatus().equals(Pipe.PipeStatus.RUNNING) &&
pipe.isDisconnected()) {
+ record.addField(
+ Binary.valueOf(pipe.getStatus().name() + "(DISCONNECTED)"),
TSDataType.TEXT);
+ } else {
+ record.addField(Binary.valueOf(pipe.getStatus().name()),
TSDataType.TEXT);
+ }
record.addField(
Binary.valueOf(SenderService.getInstance().getPipeMsg(pipe)),
TSDataType.TEXT);
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
index 94aae7b40b..92f6e7575e 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
@@ -107,6 +107,10 @@ public interface Pipe {
*/
void commit();
+ void setDisconnected(boolean disconnected);
+
+ boolean isDisconnected();
+
// a new pipe should be stop status
enum PipeStatus {
RUNNING,
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
index 802c1a5ded..7a1417d034 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
@@ -62,6 +62,7 @@ public class TsFilePipe implements Pipe {
private boolean isCollectingRealTimeData;
private long maxSerialNumber;
+ private boolean disconnected; // true if pipe cannot connect to receiver
private PipeStatus status;
@@ -84,6 +85,7 @@ public class TsFilePipe implements Pipe {
this.maxSerialNumber = Math.max(0L,
realTimeQueue.getLastMaxSerialNumber());
this.status = PipeStatus.STOP;
+ this.disconnected = false;
}
@Override
@@ -275,6 +277,16 @@ public class TsFilePipe implements Pipe {
realTimeQueue.commit();
}
+ @Override
+ public void setDisconnected(boolean disconnected) {
+ this.disconnected = disconnected;
+ }
+
+ @Override
+ public boolean isDisconnected() {
+ return disconnected;
+ }
+
public void commit(long serialNumber) {
if (!historyQueue.isEmpty()) {
historyQueue.commit(serialNumber);
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/MsgManager.java
b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/MsgManager.java
index 4853dec5d1..2ddf86f23f 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/MsgManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/MsgManager.java
@@ -82,7 +82,7 @@ public class MsgManager {
public synchronized String getPipeMsg(Pipe pipe) {
if (runningPipe == null) {
return "";
- } else if (!pipe.equals(runningPipe) || Messages.size() == 0) {
+ } else if (!pipe.equals(runningPipe)) {
return "";
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
index ec5a511507..9faa79876d 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
@@ -247,6 +247,7 @@ public class SenderService implements IService {
checkRunningPipeExistAndName(pipeName);
if (runningPipe.getStatus() == Pipe.PipeStatus.STOP) {
if (runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
+ sendMsg(RequestType.START);
runningPipe.start();
transportHandler.start();
} else { // for external PIPE
@@ -352,6 +353,10 @@ public class SenderService implements IService {
}
}
+ public void setConnecting(boolean isConnecting) {
+ runningPipe.setDisconnected(isConnecting);
+ }
+
/** transport */
private void sendMsg(RequestType type) throws PipeException {
try {
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
index 720557f235..010ea39ca6 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
@@ -53,6 +53,7 @@ public class TransportHandler {
private long createTime;
private final String localIP;
protected ITransportClient transportClient;
+ private final Pipe pipe;
protected ExecutorService transportExecutorService;
private Future transportFuture;
@@ -61,10 +62,9 @@ public class TransportHandler {
private Future heartbeatFuture;
public TransportHandler(Pipe pipe, IoTDBPipeSink pipeSink) {
+ this.pipe = pipe;
this.pipeName = pipe.getName();
this.createTime = pipe.getCreateTime();
- this.localIP = getLocalIP(pipeSink);
- this.transportClient = new TransportClient(pipe, pipeSink.getIp(),
pipeSink.getPort(), localIP);
this.transportExecutorService =
IoTDBThreadPoolFactory.newSingleThreadExecutor(
@@ -72,6 +72,9 @@ public class TransportHandler {
this.heartbeatExecutorService =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
ThreadName.SYNC_SENDER_HEARTBEAT.getName() + "-" + pipeName);
+
+ this.localIP = getLocalIP(pipeSink);
+ this.transportClient = new TransportClient(pipe, pipeSink.getIp(),
pipeSink.getPort(), localIP);
}
private String getLocalIP(IoTDBPipeSink pipeSink) {
@@ -136,7 +139,12 @@ public class TransportHandler {
.receiveMsg(
transportClient.heartbeat(
new SyncRequest(RequestType.HEARTBEAT, pipeName, localIP,
createTime)));
+ synchronized (((TransportClient) transportClient).getWaitLock()) {
+ pipe.setDisconnected(false);
+ ((TransportClient) transportClient).getWaitLock().notifyAll();
+ }
} catch (SyncConnectionException e) {
+ pipe.setDisconnected(true);
logger.warn(
String.format(
"Pipe %s sends heartbeat to receiver error, skip this time,
because %s.",
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ClientWrapper.java
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ClientWrapper.java
new file mode 100644
index 0000000000..ba8ceaaf3d
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ClientWrapper.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.transport.client;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.SyncConnectionException;
+import org.apache.iotdb.db.sync.conf.SyncConstant;
+import org.apache.iotdb.db.sync.sender.pipe.Pipe;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.service.transport.thrift.IdentityInfo;
+import org.apache.iotdb.service.transport.thrift.TransportService;
+import org.apache.iotdb.service.transport.thrift.TransportStatus;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.iotdb.db.sync.transport.conf.TransportConstant.SUCCESS_CODE;
+
+public class ClientWrapper {
+ private static final Logger logger =
LoggerFactory.getLogger(ClientWrapper.class);
+ private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+
+ private TTransport transport = null;
+ private volatile TransportService.Client serviceClient = null;
+
+ /* remote IP address*/
+ private final String ipAddress;
+ /* remote port */
+ private final int port;
+ /* local IP address*/
+ private final String localIP;
+
+ private final Pipe pipe;
+
+ public ClientWrapper(Pipe pipe, String ipAddress, int port, String localIP) {
+ this.pipe = pipe;
+ this.ipAddress = ipAddress;
+ this.port = port;
+ this.localIP = localIP;
+ }
+
+ public TransportService.Client getClient() {
+ return serviceClient;
+ }
+
+ /**
+ * create connection and handshake before sending messages
+ *
+ * @return true if success; false if failed to check IoTDB version.
+ * @throws SyncConnectionException cannot create connection to receiver
+ */
+ public boolean handshakeWithVersion() throws SyncConnectionException {
+ if (transport != null && transport.isOpen()) {
+ transport.close();
+ }
+
+ try {
+ transport =
+ RpcTransportFactory.INSTANCE.getTransport(
+ new TSocket(
+ TConfigurationConst.defaultTConfiguration,
+ ipAddress,
+ port,
+ SyncConstant.SOCKET_TIMEOUT_MILLISECONDS,
+ SyncConstant.CONNECT_TIMEOUT_MILLISECONDS));
+ TProtocol protocol;
+ if (config.isRpcThriftCompressionEnable()) {
+ protocol = new TCompactProtocol(transport);
+ } else {
+ protocol = new TBinaryProtocol(transport);
+ }
+ serviceClient = new TransportService.Client(protocol);
+
+ // Underlay socket open.
+ if (!transport.isOpen()) {
+ transport.open();
+ }
+
+ IdentityInfo identityInfo =
+ new IdentityInfo(
+ localIP, pipe.getName(), pipe.getCreateTime(),
config.getIoTDBMajorVersion());
+ TransportStatus status = serviceClient.handshake(identityInfo);
+ if (status.code != SUCCESS_CODE) {
+ logger.error("The receiver rejected the synchronization task because
{}", status.msg);
+ return false;
+ }
+ } catch (TException e) {
+ logger.warn("Cannot connect to the receiver because {}", e.getMessage());
+ throw new SyncConnectionException(
+ String.format("Cannot connect to the receiver because %s.",
e.getMessage()));
+ }
+ return true;
+ }
+
+ public void close() {
+ if (transport != null) {
+ transport.close();
+ transport = null;
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ITransportClient.java
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ITransportClient.java
index 9893efa651..2a99570e7b 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ITransportClient.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ITransportClient.java
@@ -24,5 +24,7 @@ import org.apache.iotdb.service.transport.thrift.SyncRequest;
import org.apache.iotdb.service.transport.thrift.SyncResponse;
public interface ITransportClient extends Runnable {
+
+ /** send control message */
SyncResponse heartbeat(SyncRequest syncRequest) throws
SyncConnectionException;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java
index 7613f3f8ec..905a3fb1fc 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.db.sync.sender.service.SenderService;
import org.apache.iotdb.db.sync.transport.conf.TransportConstant;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TConfigurationConst;
-import org.apache.iotdb.service.transport.thrift.IdentityInfo;
import org.apache.iotdb.service.transport.thrift.MetaInfo;
import org.apache.iotdb.service.transport.thrift.RequestType;
import org.apache.iotdb.service.transport.thrift.ResponseType;
@@ -73,92 +72,69 @@ public class TransportClient implements ITransportClient {
private static final int TRANSFER_BUFFER_SIZE_IN_BYTES = 1 * 1024 * 1024;
- private TTransport transport = null;
+ private final ClientWrapper serviceClient;
+ private final ClientWrapper heartbeatClient;
- private TransportService.Client serviceClient = null;
+ /* remote IP address*/
+ private final String ipAddress;
+ /* remote port */
+ private final int port;
+ /* local IP address*/
+ private final String localIP;
- private String ipAddress;
- private String localIP;
+ private final Pipe pipe;
- private int port;
-
- private IdentityInfo identityInfo = null;
-
- private Pipe pipe;
+ /* hold this lock to wait until successfully reconnect to receiver */
+ private final Object waitLock;
+ /**
+ * @param pipe sync task
+ * @param ipAddress remote ip address
+ * @param port remote port
+ * @param localIP local ip address
+ */
public TransportClient(Pipe pipe, String ipAddress, int port, String
localIP) {
RpcTransportFactory.setThriftMaxFrameSize(config.getThriftMaxFrameSize());
-
this.pipe = pipe;
this.ipAddress = ipAddress;
- this.localIP = localIP;
this.port = port;
+ this.waitLock = new Object();
+ this.localIP = localIP;
+ serviceClient = new ClientWrapper(pipe, ipAddress, port, localIP);
+ heartbeatClient = new ClientWrapper(pipe, ipAddress, port, localIP);
}
- public boolean handshake() {
- int handshakeCounter = 0;
- try {
- while (!handshakeWithVersion()) {
- handshakeCounter++;
- if (handshakeCounter > config.getMaxNumberOfSyncFileRetry()) {
- logger.error(
- String.format(
- "Handshake failed %s times! Check network.",
- config.getMaxNumberOfSyncFileRetry()));
- return false;
- }
- logger.info(
+ public Object getWaitLock() {
+ return waitLock;
+ }
+
+ /**
+ * Create thrift connection to receiver. (1) register pipe message,
including pipeName, localIp
+ * and createTime (2) check IoTDB version to make sure compatibility
+ *
+ * @return true if success; false if failed MaxNumberOfSyncFileRetry times.
+ * @throws SyncConnectionException cannot create connection to receiver
+ */
+ public synchronized boolean handshake() throws SyncConnectionException {
+ for (int handshakeCounter = 0;
+ handshakeCounter < config.getMaxNumberOfSyncFileRetry();
+ handshakeCounter++) {
+ try {
+ return serviceClient.handshakeWithVersion();
+ } catch (SyncConnectionException e) {
+ logger.warn(
String.format(
"Handshake error, retry %d/%d.",
handshakeCounter, config.getMaxNumberOfSyncFileRetry()));
}
- } catch (SyncConnectionException e) {
- logger.error(String.format("Handshake failed and can not retry, because
%s.", e), e);
- return false;
}
- return true;
- }
-
- private boolean handshakeWithVersion() throws SyncConnectionException {
- if (transport != null && transport.isOpen()) {
- transport.close();
- }
-
- try {
- transport =
- RpcTransportFactory.INSTANCE.getTransport(
- new TSocket(
- TConfigurationConst.defaultTConfiguration,
- ipAddress,
- port,
- SyncConstant.SOCKET_TIMEOUT_MILLISECONDS,
- SyncConstant.CONNECT_TIMEOUT_MILLISECONDS));
- TProtocol protocol;
- if (config.isRpcThriftCompressionEnable()) {
- protocol = new TCompactProtocol(transport);
- } else {
- protocol = new TBinaryProtocol(transport);
- }
- serviceClient = new TransportService.Client(protocol);
-
- // Underlay socket open.
- if (!transport.isOpen()) {
- transport.open();
- }
-
- identityInfo =
- new IdentityInfo(
- localIP, pipe.getName(), pipe.getCreateTime(),
config.getIoTDBMajorVersion());
- TransportStatus status = serviceClient.handshake(identityInfo);
- if (status.code != SUCCESS_CODE) {
- throw new SyncConnectionException(
- "The receiver rejected the synchronization task because " +
status.msg);
- }
- } catch (TException e) {
- logger.warn("Cannot connect to the receiver. ", e);
+ if (!serviceClient.handshakeWithVersion()) {
+ logger.info(
+ String.format("Handshake failed %s times!",
config.getMaxNumberOfSyncFileRetry()));
return false;
+ } else {
+ return true;
}
- return true;
}
public boolean senderTransport(PipeData pipeData) throws
SyncConnectionException {
@@ -193,7 +169,15 @@ public class TransportClient implements ITransportClient {
break;
} catch (SyncConnectionException e) {
// handshake and retry
- if (!handshake()) {
+ try {
+ if (!handshake()) {
+ logger.error(
+ String.format(
+ "Handshake to receiver %s:%d error when transfer pipe data
%s.",
+ ipAddress, port, pipeData));
+ return false;
+ }
+ } catch (SyncConnectionException syncConnectionException) {
logger.error(
String.format(
"Reconnect to receiver %s:%d error when transfer pipe data
%s.",
@@ -243,7 +227,14 @@ public class TransportClient implements ITransportClient {
}
break;
} catch (SyncConnectionException e) {
- if (!handshake()) {
+ // handshake and retry
+ try {
+ if (!handshake()) {
+ throw new SyncConnectionException(
+ String.format(
+ "Handshake with receiver error when transferring file
%s.", file.getName()));
+ }
+ } catch (SyncConnectionException syncConnectionException) {
throw new SyncConnectionException(
String.format(
"Connect to receiver error when transferring file %s.",
file.getName()));
@@ -299,8 +290,9 @@ public class TransportClient implements ITransportClient {
}
try {
status =
- serviceClient.transportData(
- metaInfo, buffToSend,
ByteBuffer.wrap(messageDigest.digest()));
+ serviceClient
+ .getClient()
+ .transportData(metaInfo, buffToSend,
ByteBuffer.wrap(messageDigest.digest()));
} catch (TException e) {
// retry
logger.error("TException happened! ", e);
@@ -378,7 +370,10 @@ public class TransportClient implements ITransportClient {
file.getAbsoluteFile(), config.getMaxNumberOfSyncFileRetry()));
}
try {
- status = serviceClient.checkFileDigest(metaInfo,
ByteBuffer.wrap(messageDigest.digest()));
+ status =
+ serviceClient
+ .getClient()
+ .checkFileDigest(metaInfo,
ByteBuffer.wrap(messageDigest.digest()));
} catch (TException e) {
// retry
logger.error("TException happens! ", e);
@@ -418,8 +413,9 @@ public class TransportClient implements ITransportClient {
MetaInfo metaInfo =
new MetaInfo(Type.findByValue(pipeData.getType().ordinal()),
"fileName", 0);
TransportStatus status =
- serviceClient.transportData(
- metaInfo, buffToSend, ByteBuffer.wrap(messageDigest.digest()));
+ serviceClient
+ .getClient()
+ .transportData(metaInfo, buffToSend,
ByteBuffer.wrap(messageDigest.digest()));
if (status.code == SUCCESS_CODE) {
break;
@@ -446,42 +442,47 @@ public class TransportClient implements ITransportClient {
@Override
public void run() {
try {
- if (!handshake()) {
- throw new SyncConnectionException(
- String.format("Handshake with receiver %s:%d error.", ipAddress,
port));
- }
- SenderService.getInstance()
- .receiveMsg(
- heartbeat(
- new SyncRequest(
- RequestType.START, pipe.getName(), localIP,
pipe.getCreateTime())));
while (!Thread.currentThread().isInterrupted()) {
- PipeData pipeData = pipe.take();
- if (!senderTransport(pipeData)) {
- logger.error(String.format("Can not transfer pipedata %s, skip it.",
pipeData));
- // can do something.
+ try {
+ if (!handshake()) {
+ SenderService.getInstance()
+ .receiveMsg(
+ new SyncResponse(
+ ResponseType.ERROR,
+ String.format("Can not handshake with %s:%d.",
ipAddress, port)));
+ }
SenderService.getInstance()
.receiveMsg(
- new SyncResponse(
- ResponseType.WARN,
- String.format(
- "Transfer piepdata %s error, skip it.",
pipeData.getSerialNumber())));
- continue;
+ heartbeat(
+ new SyncRequest(
+ RequestType.START, pipe.getName(), localIP,
pipe.getCreateTime())));
+ while (!Thread.currentThread().isInterrupted()) {
+ PipeData pipeData = pipe.take();
+ if (!senderTransport(pipeData)) {
+ logger.error(String.format("Can not transfer pipedata %s, skip
it.", pipeData));
+ // can do something.
+ SenderService.getInstance()
+ .receiveMsg(
+ new SyncResponse(
+ ResponseType.WARN,
+ String.format(
+ "Transfer piepdata %s error, skip it.",
pipeData.getSerialNumber())));
+ continue;
+ }
+ pipe.commit();
+ }
+ } catch (SyncConnectionException e) {
+ logger.error(
+ String.format("Connect to receiver %s:%d error, because %s.",
ipAddress, port, e));
+ // wait and retry
+ synchronized (waitLock) {
+ SenderService.getInstance().setConnecting(true);
+ waitLock.wait();
+ }
}
- pipe.commit();
}
} catch (InterruptedException e) {
logger.info("Interrupted by pipe, exit transport.");
- } catch (SyncConnectionException e) {
- logger.error(
- String.format("Connect to receiver %s:%d error, because %s.",
ipAddress, port, e));
- SenderService.getInstance()
- .receiveMsg(
- new SyncResponse(
- ResponseType.ERROR,
- String.format(
- "Can not connect to %s:%d, please check receiver and
Internet.",
- ipAddress, port)));
} finally {
close();
}
@@ -489,6 +490,9 @@ public class TransportClient implements ITransportClient {
@Override
public SyncResponse heartbeat(SyncRequest syncRequest) throws
SyncConnectionException {
+ if (syncRequest.getType().equals(RequestType.HEARTBEAT)) {
+ return requestHeartbeat(syncRequest);
+ }
int retryCount = 0;
while (true) {
retryCount++;
@@ -528,9 +532,47 @@ public class TransportClient implements ITransportClient {
}
}
- public void close() {
- if (transport != null) {
- transport.close();
+ /**
+ * deal with HEARTBEAT type request, use a special client to send request.
+ *
+ * @param syncRequest must be HEARTBEAT type request
+ * @throws SyncConnectionException cannot connect to receiver
+ */
+ private SyncResponse requestHeartbeat(SyncRequest syncRequest) throws
SyncConnectionException {
+ if (heartbeatClient.getClient() == null) {
+ synchronized (heartbeatClient) {
+ if (heartbeatClient.getClient() == null) {
+ if (!heartbeatClient.handshakeWithVersion()) {
+ throw new SyncConnectionException("Handshake with receiver error
when heartbeat.");
+ }
+ }
+ }
}
+ int retryCount = 0;
+ while (true) {
+ retryCount++;
+ if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
+ throw new SyncConnectionException(
+ String.format(
+ "%s request connects to receiver %s:%d error.",
+ syncRequest.type.name(), ipAddress, port));
+ }
+ try {
+ return heartbeatClient.getClient().heartbeat(syncRequest);
+ } catch (TException e) {
+ if (!heartbeatClient.handshakeWithVersion()) {
+ throw new SyncConnectionException("Handshake with receiver error
when heartbeat.");
+ }
+ logger.info(
+ String.format(
+ "Heartbeat connect to receiver %s:%d error, retry %d/%d.",
+ ipAddress, port, retryCount,
config.getMaxNumberOfSyncFileRetry()));
+ }
+ }
+ }
+
+ public void close() {
+ serviceClient.close();
+ heartbeatClient.close();
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
index e8a748b77b..e52560db24 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
@@ -57,6 +57,8 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.text.DecimalFormat;
import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import static org.apache.iotdb.db.sync.conf.SyncConstant.DATA_CHUNK_SIZE;
import static
org.apache.iotdb.db.sync.transport.conf.TransportConstant.CONFLICT_CODE;
@@ -71,10 +73,12 @@ public class TransportServiceImpl implements
TransportService.Iface {
private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final String RECORD_SUFFIX = ".record";
private static final String PATCH_SUFFIX = ".patch";
- private ThreadLocal<IdentityInfo> identityInfoThreadLocal;
+ private final ThreadLocal<IdentityInfo> identityInfoThreadLocal;
+ private final Map<IdentityInfo, Integer> identityInfoCounter;
public TransportServiceImpl() {
identityInfoThreadLocal = new ThreadLocal<>();
+ identityInfoCounter = new ConcurrentHashMap<>();
}
private class CheckResult {
@@ -142,6 +146,7 @@ public class TransportServiceImpl implements
TransportService.Iface {
public TransportStatus handshake(IdentityInfo identityInfo) throws
TException {
logger.debug("Invoke handshake method from client ip = {}",
identityInfo.address);
identityInfoThreadLocal.set(identityInfo);
+ identityInfoCounter.compute(identityInfo, (k, v) -> v == null ? 1 : v + 1);
// check ip address
if (!verifyIPSegment(config.getIpWhiteList(), identityInfo.address)) {
return new TransportStatus(
@@ -351,15 +356,21 @@ public class TransportServiceImpl implements
TransportService.Iface {
// Handle client exit here.
IdentityInfo identityInfo = identityInfoThreadLocal.get();
if (identityInfo != null) {
- // stop pipe
+ // if all connections exit, stop pipe
identityInfoThreadLocal.remove();
- ReceiverService.getInstance()
- .receiveMsg(
- new SyncRequest(
- RequestType.STOP,
- identityInfo.getPipeName(),
- identityInfo.getAddress(),
- identityInfo.getCreateTime()));
+ synchronized (identityInfoCounter) {
+ identityInfoCounter.compute(identityInfo, (k, v) -> v == null ? 0 : v
- 1);
+ if (identityInfoCounter.get(identityInfo) == 0) {
+ identityInfoCounter.remove(identityInfo);
+ ReceiverService.getInstance()
+ .receiveMsg(
+ new SyncRequest(
+ RequestType.STOP,
+ identityInfo.getPipeName(),
+ identityInfo.getAddress(),
+ identityInfo.getCreateTime()));
+ }
+ }
}
}