This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6517daf661 [IOTDB-3950] Transport layer optimization for sync modules
(#7038)
6517daf661 is described below
commit 6517daf661a9124b6bab6a5f1a0d57235e00b035
Author: Chen YZ <[email protected]>
AuthorDate: Wed Aug 24 20:16:39 2022 +0800
[IOTDB-3950] Transport layer optimization for sync modules (#7038)
---
.../db/integration/sync/IoTDBSyncReceiverIT.java | 6 +-
.../db/integration/sync/IoTDBSyncSenderIT.java | 13 +-
...ortClientMock.java => MockTransportClient.java} | 42 +-
.../db/integration/sync/TransportHandlerMock.java | 58 ---
.../apache/iotdb/commons/sync/SyncConstant.java | 2 +
.../service/thrift/impl/ClientRPCServiceImpl.java | 10 +-
.../db/service/thrift/impl/TSServiceImpl.java | 10 +-
.../java/org/apache/iotdb/db/sync/SyncService.java | 49 +--
.../iotdb/db/sync/common/ISyncInfoFetcher.java | 2 +-
.../iotdb/db/sync/common/LocalSyncInfoFetcher.java | 2 +-
.../org/apache/iotdb/db/sync/common/SyncInfo.java | 2 +-
.../db/sync/common/persistence/SyncLogReader.java | 2 +-
.../db/sync/common/persistence/SyncLogWriter.java | 2 +-
.../manager => sender/pipe}/PipeMessage.java | 2 +-
.../db/sync/sender/service/TransportHandler.java | 126 ------
.../db/sync/transport/client/ClientWrapper.java | 124 ------
.../db/sync/transport/client/ITransportClient.java | 23 +-
.../transport/client/IoTDBSinkTransportClient.java | 450 ++++++---------------
.../db/sync/transport/client/SenderManager.java | 120 ++++++
.../transport/client/TransportClientFactory.java | 68 ++++
.../db/sync/transport/conf/TransportConfig.java | 45 ---
.../db/sync/transport/server/ReceiverManager.java | 323 +++++++--------
.../db/sync/receiver/manager/SyncInfoTest.java | 1 +
.../db/sync/receiver/recovery/SyncLogTest.java | 2 +-
.../iotdb/db/sync/transport/SyncTransportTest.java | 173 +++++++-
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 3 +-
thrift/src/main/thrift/client.thrift | 17 +-
27 files changed, 702 insertions(+), 975 deletions(-)
diff --git
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
index 67a4ab4618..a6115d536b 100644
---
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
+++
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
@@ -166,16 +166,16 @@ public class IoTDBSyncReceiverIT {
null));
planList.add(new SetStorageGroupPlan(new PartialPath("root.sg1")));
for (PhysicalPlan plan : planList) {
- client.senderTransport(new SchemaPipeData(plan, serialNum++));
+ client.send(new SchemaPipeData(plan, serialNum++));
}
List<File> tsFiles = SyncTestUtil.getTsFilePaths(tmpDir);
SyncTestUtil.renameTsFiles(tsFiles);
for (File f : tsFiles) {
- client.senderTransport(new TsFilePipeData(f.getPath(), serialNum++));
+ client.send(new TsFilePipeData(f.getPath(), serialNum++));
}
Deletion deletion = new Deletion(new PartialPath("root.vehicle.**"), 0,
33, 38);
PipeData pipeData = new DeletionPipeData(deletion, serialNum++);
- client.senderTransport(pipeData);
+ client.send(pipeData);
// wait collector to load pipe data
Thread.sleep(1000);
diff --git
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
index 21952d47c9..368b0ff0c8 100644
---
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
+++
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
@@ -22,14 +22,13 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.qp.physical.sys.ShowPipeSinkTypePlan;
+import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.sync.common.LocalSyncInfoFetcher;
import org.apache.iotdb.db.sync.pipedata.DeletionPipeData;
import org.apache.iotdb.db.sync.pipedata.PipeData;
import org.apache.iotdb.db.sync.pipedata.SchemaPipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
-import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
-import org.apache.iotdb.db.sync.sender.service.TransportHandler;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.wal.recover.WALRecoverManager;
import org.apache.iotdb.itbase.category.LocalStandaloneTest;
@@ -63,8 +62,7 @@ public class IoTDBSyncSenderIT {
private static final String pipeSinkName = "test_pipesink";
private static final String pipeName = "test_pipe";
- private TransportHandlerMock handler;
- private TransportClientMock transportClient;
+ private MockTransportClient transportClient;
private final Map<String, List<PipeData>> resultMap = new HashMap<>();
private static final TsFilePipeData simpleTsFilePipeData =
@@ -89,10 +87,7 @@ public class IoTDBSyncSenderIT {
Class.forName(Config.JDBC_DRIVER_NAME);
IoTDBPipeSink pipeSink = new IoTDBPipeSink(pipeSinkName);
- TsFilePipe pipe = new TsFilePipe(0L, pipeName, pipeSink, 0L, true);
- transportClient = new TransportClientMock(pipe, pipeSink.getIp(),
pipeSink.getPort());
- handler = new TransportHandlerMock(pipe, pipeSink, transportClient);
- TransportHandler.setDebugTransportHandler(handler);
+ transportClient = new MockTransportClient();
LocalSyncInfoFetcher.getInstance().reset();
}
@@ -255,6 +250,7 @@ public class IoTDBSyncSenderIT {
statement.execute("create pipesink " + pipeSinkName + " as iotdb");
statement.execute("create pipe " + pipeName + " to " + pipeSinkName);
}
+
SyncService.getInstance().getSenderManager().setTransportClient(transportClient);
}
private void restart() throws Exception {
@@ -262,6 +258,7 @@ public class IoTDBSyncSenderIT {
EnvironmentUtils.shutdownDaemon();
WALRecoverManager.getInstance().clear();
EnvironmentUtils.reactiveDaemon();
+
SyncService.getInstance().getSenderManager().setTransportClient(transportClient);
}
private void startPipe() throws Exception {
diff --git
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportClientMock.java
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/MockTransportClient.java
similarity index 59%
rename from
integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportClientMock.java
rename to
integration/src/test/java/org/apache/iotdb/db/integration/sync/MockTransportClient.java
index aa10f630d0..25b59f10b0 100644
---
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportClientMock.java
+++
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/MockTransportClient.java
@@ -19,48 +19,34 @@
package org.apache.iotdb.db.integration.sync;
import org.apache.iotdb.db.sync.pipedata.PipeData;
-import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.transport.client.ITransportClient;
import java.util.ArrayList;
import java.util.List;
-public class TransportClientMock implements ITransportClient {
- private Pipe pipe;
- private String ipAddress;
- private int port;
+public class MockTransportClient implements ITransportClient {
- private List<PipeData> pipeDataList;
-
- public TransportClientMock(Pipe pipe, String ipAddress, int port) {
- this.pipe = pipe;
- this.ipAddress = ipAddress;
- this.port = port;
+ private final List<PipeData> pipeDataList;
+ public MockTransportClient() {
this.pipeDataList = new ArrayList<>();
}
- public void resetInfo(Pipe pipe, String ipAddress, int port) {
- this.pipe = pipe;
- this.ipAddress = ipAddress;
- this.port = port;
+ public List<PipeData> getPipeDataList() {
+ return pipeDataList;
}
@Override
- public void run() {
- try {
- while (!Thread.currentThread().isInterrupted()) {
- PipeData pipeData = pipe.take();
- pipeDataList.add(pipeData);
- pipe.commit();
- }
- } catch (InterruptedException e) {
- } catch (Exception e) {
- e.printStackTrace();
- }
+ public boolean handshake() {
+ return true;
}
- public List<PipeData> getPipeDataList() {
- return pipeDataList;
+ @Override
+ public boolean send(PipeData pipeData) {
+ pipeDataList.add(pipeData);
+ return true;
}
+
+ @Override
+ public void close() {}
}
diff --git
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportHandlerMock.java
b/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportHandlerMock.java
deleted file mode 100644
index d54e1f9805..0000000000
---
a/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportHandlerMock.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.integration.sync;
-
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
-import org.apache.iotdb.db.sync.sender.pipe.Pipe;
-import org.apache.iotdb.db.sync.sender.service.TransportHandler;
-
-public class TransportHandlerMock extends TransportHandler {
- private Pipe pipe;
-
- public TransportHandlerMock(
- Pipe pipe, IoTDBPipeSink pipeSink, TransportClientMock
transportClientMock) {
- super(pipe, pipeSink);
- this.transportClient = transportClientMock;
- }
-
- @Override
- protected void resetTransportClient(Pipe pipe) {
- try {
- super.close();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- super.resetTransportClient(pipe);
- IoTDBPipeSink pipeSink = (IoTDBPipeSink) pipe.getPipeSink();
- ((TransportClientMock) this.transportClient)
- .resetInfo(pipe, pipeSink.getIp(), pipeSink.getPort());
- this.pipe = pipe;
-
- this.transportExecutorService =
- IoTDBThreadPoolFactory.newSingleThreadExecutor(
- ThreadName.SYNC_SENDER_PIPE.getName() + "-" + pipe.getName());
- }
-
- public TransportClientMock getTransportClientMock() {
- return (TransportClientMock) transportClient;
- }
-}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java
b/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java
index bf9f4afb09..81f54e7026 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java
@@ -72,6 +72,8 @@ public class SyncConstant {
public static final int DATA_CHUNK_SIZE =
Math.min(16 * 1024 * 1024, RpcUtils.THRIFT_FRAME_MAX_SIZE);
+ public static final String PATCH_SUFFIX = ".patch";
+
/** receiver */
public static final String RECEIVER_DIR_NAME = "receiver";
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index c5b75f3bab..bab10ce4e3 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -1462,15 +1462,13 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
}
@Override
- public TSStatus transportData(TSyncTransportMetaInfo metaInfo, ByteBuffer
buff, ByteBuffer digest)
- throws TException {
- return SyncService.getInstance().transportData(metaInfo, buff, digest);
+ public TSStatus sendPipeData(ByteBuffer buff) throws TException {
+ return SyncService.getInstance().transportPipeData(buff);
}
@Override
- public TSStatus checkFileDigest(TSyncTransportMetaInfo metaInfo, ByteBuffer
digest)
- throws TException {
- return SyncService.getInstance().checkFileDigest(metaInfo, digest);
+ public TSStatus sendFile(TSyncTransportMetaInfo metaInfo, ByteBuffer buff)
throws TException {
+ return SyncService.getInstance().transportFile(metaInfo, buff);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index c439813c1f..0654444a79 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -2102,15 +2102,13 @@ public class TSServiceImpl implements
IClientRPCServiceWithHandler {
}
@Override
- public TSStatus transportData(TSyncTransportMetaInfo metaInfo, ByteBuffer
buff, ByteBuffer digest)
- throws TException {
- return SyncService.getInstance().transportData(metaInfo, buff, digest);
+ public TSStatus sendPipeData(ByteBuffer buff) throws TException {
+ return SyncService.getInstance().transportPipeData(buff);
}
@Override
- public TSStatus checkFileDigest(TSyncTransportMetaInfo metaInfo, ByteBuffer
digest)
- throws TException {
- return SyncService.getInstance().checkFileDigest(metaInfo, digest);
+ public TSStatus sendFile(TSyncTransportMetaInfo metaInfo, ByteBuffer buff)
throws TException {
+ return SyncService.getInstance().transportFile(metaInfo, buff);
}
protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
index 46eaa93eab..7fdcb7cb91 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.sync.SyncConstant;
import org.apache.iotdb.commons.sync.SyncPathUtil;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.exception.sync.PipeException;
import org.apache.iotdb.db.exception.sync.PipeSinkException;
import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
@@ -39,14 +40,14 @@ import org.apache.iotdb.db.sync.common.LocalSyncInfoFetcher;
import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginManager;
import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginRegister;
import org.apache.iotdb.db.sync.externalpipe.ExternalPipeStatus;
-import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
import org.apache.iotdb.db.sync.sender.pipe.ExternalPipeSink;
import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
+import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
-import org.apache.iotdb.db.sync.sender.service.TransportHandler;
+import org.apache.iotdb.db.sync.transport.client.SenderManager;
import org.apache.iotdb.db.sync.transport.server.ReceiverManager;
import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
import org.apache.iotdb.pipe.external.api.IExternalPipeSinkWriterFactory;
@@ -72,15 +73,16 @@ public class SyncService implements IService {
private Pipe runningPipe;
- private TransportHandler transportHandler;
-
/* handle external Pipe */
private ExtPipePluginManager extPipePluginManager;
private ISyncInfoFetcher syncInfoFetcher =
LocalSyncInfoFetcher.getInstance();
+ /* handle rpc send logic in sender-side*/
+ private SenderManager senderManager;
+
/* handle rpc in receiver-side*/
- private ReceiverManager receiverManager;
+ private final ReceiverManager receiverManager;
private SyncService() {
receiverManager = new ReceiverManager();
@@ -102,15 +104,13 @@ public class SyncService implements IService {
return receiverManager.handshake(identityInfo);
}
- public TSStatus transportData(TSyncTransportMetaInfo metaInfo, ByteBuffer
buff, ByteBuffer digest)
+ public TSStatus transportFile(TSyncTransportMetaInfo metaInfo, ByteBuffer
buff)
throws TException {
- return receiverManager.transportData(metaInfo, buff, digest);
+ return receiverManager.transportFile(metaInfo, buff);
}
- // TODO: this will be deleted later
- public TSStatus checkFileDigest(TSyncTransportMetaInfo metaInfo, ByteBuffer
digest)
- throws TException {
- return receiverManager.checkFileDigest(metaInfo, digest);
+ public TSStatus transportPipeData(ByteBuffer buff) throws TException {
+ return receiverManager.transportPipeData(buff);
}
public void handleClientExit() {
@@ -168,8 +168,7 @@ public class SyncService implements IService {
runningPipe = SyncPipeUtil.parseCreatePipePlanAsPipe(plan,
runningPipeSink, currentTime);
if (runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
try {
- transportHandler =
- TransportHandler.getNewTransportHandler(runningPipe,
(IoTDBPipeSink) runningPipeSink);
+ senderManager = new SenderManager(runningPipe, (IoTDBPipeSink)
runningPipeSink);
} catch (ClassCastException e) {
logger.error(
String.format(
@@ -193,7 +192,7 @@ public class SyncService implements IService {
if (runningPipe.getStatus() == Pipe.PipeStatus.RUNNING) {
if (runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
runningPipe.stop();
- transportHandler.stop();
+ senderManager.stop();
} else { // for external PIPE
// == pause externalPipeProcessor's task
if (extPipePluginManager != null) {
@@ -217,7 +216,7 @@ public class SyncService implements IService {
if (runningPipe.getStatus() == Pipe.PipeStatus.STOP) {
if (runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
runningPipe.start();
- transportHandler.start();
+ senderManager.start();
} else { // for external PIPE
runningPipe.start();
startExternalPipeManager(true);
@@ -230,7 +229,7 @@ public class SyncService implements IService {
checkRunningPipeExistAndName(pipeName);
try {
if (runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
- if (!transportHandler.close()) {
+ if (!senderManager.close()) {
throw new PipeException(
String.format(
"Close pipe %s transport error after %s %s, please try
again.",
@@ -390,7 +389,7 @@ public class SyncService implements IService {
if (externalPipeSinkWriterFactory == null) {
logger.error(
String.format(
- "startExternalPipeManager(), can not found ExternalPipe plugin
for {}.",
+ "startExternalPipeManager(), can not found ExternalPipe plugin
for %s.",
extPipeSinkTypeName));
throw new PipeException("Can not found ExternalPipe plugin for " +
extPipeSinkTypeName + ".");
}
@@ -446,7 +445,7 @@ public class SyncService implements IService {
if (runningPipe != null &&
!Pipe.PipeStatus.DROP.equals(runningPipe.getStatus())) {
try {
runningPipe.stop();
- transportHandler.stop();
+ senderManager.stop();
} catch (PipeException e) {
logger.warn(
String.format("Stop pipe %s error when stop Sender Service.",
runningPipe.getName()),
@@ -460,7 +459,7 @@ public class SyncService implements IService {
if (runningPipe != null &&
!Pipe.PipeStatus.DROP.equals(runningPipe.getStatus())) {
try {
runningPipe.stop();
- transportHandler.close();
+ senderManager.close();
runningPipe.close();
} catch (PipeException | InterruptedException e) {
logger.warn(
@@ -503,15 +502,19 @@ public class SyncService implements IService {
}
if (runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
- this.transportHandler =
- TransportHandler.getNewTransportHandler(
- runningPipe, (IoTDBPipeSink) runningPipe.getPipeSink());
+ this.senderManager =
+ new SenderManager(runningPipe, (IoTDBPipeSink)
runningPipe.getPipeSink());
if (Pipe.PipeStatus.RUNNING.equals(runningPipe.getStatus())) {
- transportHandler.start();
+ senderManager.start();
}
} else { // for external pipe
// == start ExternalPipeProcessor for send data to external pipe plugin
startExternalPipeManager(runningPipe.getStatus() ==
Pipe.PipeStatus.RUNNING);
}
}
+
+ @TestOnly
+ public SenderManager getSenderManager() {
+ return senderManager;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
b/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
index f13483444d..ea85433f22 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.db.sync.common;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
-import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
+import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
import java.util.List;
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
index e74c6748ea..3a89cdc0f8 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
@@ -25,8 +25,8 @@ import org.apache.iotdb.db.exception.sync.PipeSinkException;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
-import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
+import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
b/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
index 936d4a7ea7..6b08525723 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
@@ -27,9 +27,9 @@ import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.sync.common.persistence.SyncLogReader;
import org.apache.iotdb.db.sync.common.persistence.SyncLogWriter;
-import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
+import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
index ac5cd39397..b848ab1959 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
@@ -25,8 +25,8 @@ import org.apache.iotdb.commons.sync.SyncPathUtil;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
-import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
+import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
index 3408d2d5ef..01419e6f09 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.sync.SyncPathUtil;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
-import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
+import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
import java.io.BufferedWriter;
import java.io.File;
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/PipeMessage.java
b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/PipeMessage.java
similarity index 97%
rename from
server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/PipeMessage.java
rename to
server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/PipeMessage.java
index 8d81daaaf9..1a313d5158 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/PipeMessage.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/PipeMessage.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.sync.receiver.manager;
+package org.apache.iotdb.db.sync.sender.pipe;
import java.util.Objects;
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
deleted file mode 100644
index d8f96ac28e..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.sync.sender.service;
-
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.commons.sync.SyncConstant;
-import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
-import org.apache.iotdb.db.sync.sender.pipe.Pipe;
-import org.apache.iotdb.db.sync.transport.client.ITransportClient;
-import org.apache.iotdb.db.sync.transport.client.IoTDBSinkTransportClient;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-public class TransportHandler {
- private static final Logger logger =
LoggerFactory.getLogger(TransportHandler.class);
- private static TransportHandler DEBUG_TRANSPORT_HANDLER = null; // test only
-
- private String pipeName;
- private long createTime;
- private final String localIP;
- protected ITransportClient transportClient;
- private final Pipe pipe;
-
- protected ExecutorService transportExecutorService;
- private Future transportFuture;
-
- public TransportHandler(Pipe pipe, IoTDBPipeSink pipeSink) {
- this.pipe = pipe;
- this.pipeName = pipe.getName();
- this.createTime = pipe.getCreateTime();
-
- this.transportExecutorService =
- IoTDBThreadPoolFactory.newSingleThreadExecutor(
- ThreadName.SYNC_SENDER_PIPE.getName() + "-" + pipeName);
-
- this.localIP = getLocalIP(pipeSink);
- this.transportClient =
- new IoTDBSinkTransportClient(pipe, pipeSink.getIp(),
pipeSink.getPort(), localIP);
- }
-
- private String getLocalIP(IoTDBPipeSink pipeSink) {
- String localIP;
- try {
- InetAddress inetAddress = InetAddress.getLocalHost();
- if (inetAddress.isLoopbackAddress()) {
- try (final DatagramSocket socket = new DatagramSocket()) {
- socket.connect(InetAddress.getByName(pipeSink.getIp()),
pipeSink.getPort());
- localIP = socket.getLocalAddress().getHostAddress();
- }
- } else {
- localIP = inetAddress.getHostAddress();
- }
- } catch (UnknownHostException | SocketException e) {
- logger.error(String.format("Get local host error when create transport
handler."), e);
- localIP = SyncConstant.UNKNOWN_IP;
- }
- return localIP;
- }
-
- public void start() {
- transportFuture = transportExecutorService.submit(transportClient);
- }
-
- public void stop() {
- if (transportFuture != null) {
- transportFuture.cancel(true);
- }
- }
-
- public boolean close() throws InterruptedException {
- boolean isClosed;
- transportExecutorService.shutdownNow();
- isClosed =
- transportExecutorService.awaitTermination(
- SyncConstant.DEFAULT_WAITING_FOR_STOP_MILLISECONDS,
TimeUnit.MILLISECONDS);
- return isClosed;
- }
-
- public static TransportHandler getNewTransportHandler(Pipe pipe,
IoTDBPipeSink pipeSink) {
- if (DEBUG_TRANSPORT_HANDLER == null) {
- return new TransportHandler(pipe, pipeSink);
- }
- DEBUG_TRANSPORT_HANDLER.resetTransportClient(pipe); // test only
- return DEBUG_TRANSPORT_HANDLER;
- }
-
- /** test */
- @TestOnly
- public static void setDebugTransportHandler(TransportHandler
transportHandler) {
- DEBUG_TRANSPORT_HANDLER = transportHandler;
- }
-
- @TestOnly
- protected void resetTransportClient(Pipe pipe) {
- this.pipeName = pipe.getName();
- this.createTime = pipe.getCreateTime();
- }
-}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ClientWrapper.java
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ClientWrapper.java
deleted file mode 100644
index b96d6b3ead..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ClientWrapper.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.transport.client;
-
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.sync.SyncConstant;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.SyncConnectionException;
-import org.apache.iotdb.db.sync.sender.pipe.Pipe;
-import org.apache.iotdb.rpc.RpcTransportFactory;
-import org.apache.iotdb.rpc.TConfigurationConst;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
-import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
-
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ClientWrapper {
- private static final Logger logger =
LoggerFactory.getLogger(ClientWrapper.class);
- private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
-
- private TTransport transport = null;
- private volatile IClientRPCService.Client serviceClient = null;
-
- /* remote IP address*/
- private final String ipAddress;
- /* remote port */
- private final int port;
- /* local IP address*/
- private final String localIP;
-
- private final Pipe pipe;
-
- public ClientWrapper(Pipe pipe, String ipAddress, int port, String localIP) {
- this.pipe = pipe;
- this.ipAddress = ipAddress;
- this.port = port;
- this.localIP = localIP;
- }
-
- public IClientRPCService.Client getClient() {
- return serviceClient;
- }
-
- /**
- * create connection and handshake before sending messages
- *
- * @return true if success; false if failed to check IoTDB version.
- * @throws SyncConnectionException cannot create connection to receiver
- */
- public boolean handshakeWithVersion() throws SyncConnectionException {
- if (transport != null && transport.isOpen()) {
- transport.close();
- }
-
- try {
- transport =
- RpcTransportFactory.INSTANCE.getTransport(
- new TSocket(
- TConfigurationConst.defaultTConfiguration,
- ipAddress,
- port,
- SyncConstant.SOCKET_TIMEOUT_MILLISECONDS,
- SyncConstant.CONNECT_TIMEOUT_MILLISECONDS));
- TProtocol protocol;
- if (config.isRpcThriftCompressionEnable()) {
- protocol = new TCompactProtocol(transport);
- } else {
- protocol = new TBinaryProtocol(transport);
- }
- serviceClient = new IClientRPCService.Client(protocol);
-
- // Underlay socket open.
- if (!transport.isOpen()) {
- transport.open();
- }
-
- TSyncIdentityInfo identityInfo =
- new TSyncIdentityInfo(
- localIP, pipe.getName(), pipe.getCreateTime(),
config.getIoTDBMajorVersion());
- TSStatus status = serviceClient.handshake(identityInfo);
- if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- logger.error("The receiver rejected the synchronization task because
{}", status.message);
- return false;
- }
- } catch (TException e) {
- logger.warn("Cannot connect to the receiver because {}", e.getMessage());
- throw new SyncConnectionException(
- String.format("Cannot connect to the receiver because %s.",
e.getMessage()));
- }
- return true;
- }
-
- public void close() {
- if (transport != null) {
- transport.close();
- transport = null;
- }
- }
-}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ITransportClient.java
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ITransportClient.java
index 68a87d7655..c06398807d 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ITransportClient.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ITransportClient.java
@@ -19,4 +19,25 @@
*/
package org.apache.iotdb.db.sync.transport.client;
-public interface ITransportClient extends Runnable {}
+import org.apache.iotdb.db.exception.SyncConnectionException;
+import org.apache.iotdb.db.sync.pipedata.PipeData;
+
+public interface ITransportClient {
+ /**
+ * Create connection and handshake before sending messages
+ *
+ * @return true if success; false if failed to check IoTDB version.
+ * @throws SyncConnectionException cannot create connection to receiver
+ */
+ boolean handshake() throws SyncConnectionException;
+
+ /**
+ * Send {@link PipeData} to receiver and load.
+ *
+ * @return true if success; false if failed to send or load.
+ * @throws SyncConnectionException cannot create connection to receiver
+ */
+ boolean send(PipeData pipeData) throws SyncConnectionException;
+
+ void close();
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSinkTransportClient.java
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSinkTransportClient.java
index 14c5da4b50..53b6cdf119 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSinkTransportClient.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSinkTransportClient.java
@@ -24,33 +24,33 @@ import org.apache.iotdb.commons.sync.SyncConstant;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.SyncConnectionException;
-import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.sync.pipedata.PipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
-import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
-import org.apache.iotdb.service.rpc.thrift.TSyncTransportType;
import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
-import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
-import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
import static org.apache.iotdb.commons.sync.SyncConstant.DATA_CHUNK_SIZE;
-import static
org.apache.iotdb.db.sync.transport.conf.TransportConfig.isCheckFileDegistAgain;
public class IoTDBSinkTransportClient implements ITransportClient {
@@ -60,8 +60,8 @@ public class IoTDBSinkTransportClient implements
ITransportClient {
private static final int TRANSFER_BUFFER_SIZE_IN_BYTES = 1 * 1024 * 1024;
- private final ClientWrapper serviceClient;
- private final ClientWrapper heartbeatClient;
+ private TTransport transport = null;
+ private volatile IClientRPCService.Client serviceClient = null;
/* remote IP address*/
private final String ipAddress;
@@ -84,234 +84,137 @@ public class IoTDBSinkTransportClient implements
ITransportClient {
this.ipAddress = ipAddress;
this.port = port;
this.localIP = localIP;
- serviceClient = new ClientWrapper(pipe, ipAddress, port, localIP);
- heartbeatClient = new ClientWrapper(pipe, ipAddress, port, localIP);
}
/**
- * Create thrift connection to receiver. (1) register pipe message,
including pipeName, localIp
- * and createTime (2) check IoTDB version to make sure compatibility
+ * Create thrift connection to receiver. Check IoTDB version to make sure
compatibility
*
- * @return true if success; false if failed MaxNumberOfSyncFileRetry times.
+ * @return true if success; false if failed to check IoTDB version.
* @throws SyncConnectionException cannot create connection to receiver
*/
+ @Override
public synchronized boolean handshake() throws SyncConnectionException {
- for (int handshakeCounter = 0;
- handshakeCounter < config.getMaxNumberOfSyncFileRetry();
- handshakeCounter++) {
- try {
- return serviceClient.handshakeWithVersion();
- } catch (SyncConnectionException e) {
- logger.warn(
- String.format(
- "Handshake error, retry %d/%d.",
- handshakeCounter, config.getMaxNumberOfSyncFileRetry()));
- }
+ if (transport != null && transport.isOpen()) {
+ transport.close();
}
- if (!serviceClient.handshakeWithVersion()) {
- logger.info(
- String.format("Handshake failed %s times!",
config.getMaxNumberOfSyncFileRetry()));
- return false;
- } else {
- return true;
- }
- }
- public boolean senderTransport(PipeData pipeData) throws
SyncConnectionException {
- if (pipeData instanceof TsFilePipeData) {
- try {
- for (File file : ((TsFilePipeData) pipeData).getTsFiles(true)) {
- transportSingleFile(file);
- }
- } catch (IOException e) {
- logger.error(String.format("Get tsfiles error, because %s.", e), e);
- return false;
- } catch (NoSuchAlgorithmException e) {
- logger.error(String.format("Wrong message digest, because %s.", e), e);
- return false;
+ try {
+ transport =
+ RpcTransportFactory.INSTANCE.getTransport(
+ new TSocket(
+ TConfigurationConst.defaultTConfiguration,
+ ipAddress,
+ port,
+ SyncConstant.SOCKET_TIMEOUT_MILLISECONDS,
+ SyncConstant.CONNECT_TIMEOUT_MILLISECONDS));
+ TProtocol protocol;
+ if (config.isRpcThriftCompressionEnable()) {
+ protocol = new TCompactProtocol(transport);
+ } else {
+ protocol = new TBinaryProtocol(transport);
}
- }
-
- int retryCount = 0;
+ serviceClient = new IClientRPCService.Client(protocol);
- while (true) {
- retryCount++;
- if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
- logger.error(
- String.format("After %s tries, stop the transport of current
pipeData!", retryCount));
- throw new SyncConnectionException(
- String.format("Can not connect to receiver when transferring
pipedata %s.", pipeData));
+ // Underlay socket open.
+ if (!transport.isOpen()) {
+ transport.open();
}
- try {
- transportPipeData(pipeData);
- logger.info(String.format("Finish pipeData %s transport.", pipeData));
- break;
- } catch (SyncConnectionException e) {
- // handshake and retry
- try {
- if (!handshake()) {
- logger.error(
- String.format(
- "Handshake to receiver %s:%d error when transfer pipe data
%s.",
- ipAddress, port, pipeData));
- return false;
- }
- } catch (SyncConnectionException syncConnectionException) {
- logger.error(
- String.format(
- "Reconnect to receiver %s:%d error when transfer pipe data
%s.",
- ipAddress, port, pipeData));
- throw new SyncConnectionException(
- String.format(
- "Reconnect to receiver error when transferring pipedata
%s.", pipeData));
- }
- } catch (NoSuchAlgorithmException e) {
- logger.error("Transport failed. ", e);
+ TSyncIdentityInfo identityInfo =
+ new TSyncIdentityInfo(
+ localIP, pipe.getName(), pipe.getCreateTime(),
config.getIoTDBMajorVersion());
+ TSStatus status = serviceClient.handshake(identityInfo);
+ if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ logger.error("The receiver rejected the synchronization task because
{}", status.message);
return false;
}
+ } catch (TException e) {
+ logger.warn("Cannot connect to the receiver because {}", e.getMessage());
+ throw new SyncConnectionException(
+ String.format("Cannot connect to the receiver because %s.",
e.getMessage()));
}
return true;
}
- /** Transfer data of a tsfile to the receiver. */
- @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
- private void transportSingleFile(File file)
- throws SyncConnectionException, NoSuchAlgorithmException {
-
- MessageDigest messageDigest = MessageDigest.getInstance("SHA-256");
-
- int retryCount = 0;
- while (true) {
- retryCount++;
- if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
- throw new SyncConnectionException(
- String.format("Connect to receiver error when transferring file
%s.", file.getName()));
- }
-
+ /**
+ * Send {@link PipeData} to receiver and load. If PipeData is
TsFilePipeData, The TsFiles will be
+ * transferred before the PipeData transfer.
+ *
+ * @return true if success; false if failed to send or load.
+ * @throws SyncConnectionException cannot create connection to receiver
+ */
+ @Override
+ public boolean send(PipeData pipeData) throws SyncConnectionException {
+ if (pipeData instanceof TsFilePipeData) {
try {
- transportSingleFilePieceByPiece(file, messageDigest);
-
- if (isCheckFileDegistAgain) {
- // Check file digest as entirety.
- try {
- if (!checkFileDigest(file, messageDigest)) {
- continue;
- }
- } catch (IOException e) {
- logger.warn(
- String.format(
- "Read from disk to make digest error, skip check file %s,
because %s.",
- file.getName(), e));
- }
- }
- break;
- } catch (SyncConnectionException e) {
- // handshake and retry
- try {
- if (!handshake()) {
- throw new SyncConnectionException(
- String.format(
- "Handshake with receiver error when transferring file
%s.", file.getName()));
+ for (File file : ((TsFilePipeData) pipeData).getTsFiles(true)) {
+ if (!transportSingleFilePieceByPiece(file)) {
+ return false;
}
- } catch (SyncConnectionException syncConnectionException) {
- throw new SyncConnectionException(
- String.format(
- "Connect to receiver error when transferring file %s.",
file.getName()));
}
+ } catch (IOException e) {
+ logger.error(String.format("Get TsFiles error, because %s.", e), e);
+ return false;
}
}
-
- logger.info("Receiver has received {} successfully.",
file.getAbsoluteFile());
+ try {
+ return transportPipeData(pipeData);
+ } catch (IOException e) {
+ logger.error(String.format("Transport PipeData error, because %s.", e),
e);
+ return false;
+ }
}
- private void transportSingleFilePieceByPiece(File file, MessageDigest
messageDigest)
- throws SyncConnectionException {
-
+ /**
+ * Transport file piece by piece.
+ *
+ * @return true if success; false if failed.
+ * @throws SyncConnectionException Connection exception, wait for a while
and try again
+ * @throws IOException Serialize error.
+ */
+ private boolean transportSingleFilePieceByPiece(File file)
+ throws SyncConnectionException, IOException {
// Cut the file into pieces to send
long position = 0;
-
long limit = getFileSizeLimit(file);
// Try small piece to rebase the file position.
byte[] buffer = new byte[TRANSFER_BUFFER_SIZE_IN_BYTES];
-
- outer:
- while (true) {
-
- // Normal piece.
- if (position != 0L && buffer.length != DATA_CHUNK_SIZE) {
- buffer = new byte[DATA_CHUNK_SIZE];
- }
-
- int dataLength;
- try (RandomAccessFile randomAccessFile = new RandomAccessFile(file,
"r")) {
- if (limit <= position) {
+ int dataLength;
+ try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) {
+ while (position < limit) {
+ // Normal piece.
+ if (position != 0L && buffer.length != DATA_CHUNK_SIZE) {
+ buffer = new byte[DATA_CHUNK_SIZE];
+ }
+ dataLength =
+ randomAccessFile.read(buffer, 0, Math.min(buffer.length, (int)
(limit - position)));
+ if (dataLength == -1) {
break;
}
- randomAccessFile.seek(position);
- while ((dataLength =
- randomAccessFile.read(buffer, 0, Math.min(buffer.length, (int)
(limit - position))))
- != -1) {
- messageDigest.reset();
- messageDigest.update(buffer, 0, dataLength);
- ByteBuffer buffToSend = ByteBuffer.wrap(buffer, 0, dataLength);
- TSyncTransportMetaInfo metaInfo =
- new TSyncTransportMetaInfo(TSyncTransportType.FILE,
file.getName(), position);
+ ByteBuffer buffToSend = ByteBuffer.wrap(buffer, 0, dataLength);
+ TSyncTransportMetaInfo metaInfo = new
TSyncTransportMetaInfo(file.getName(), position);
- TSStatus status = null;
- int retryCount = 0;
- while (true) {
- retryCount++;
- if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
- throw new SyncConnectionException(
- String.format(
- "Can not sync file %s after %s tries.",
- file.getAbsoluteFile(),
config.getMaxNumberOfSyncFileRetry()));
- }
- try {
- status =
- serviceClient
- .getClient()
- .transportData(metaInfo, buffToSend,
ByteBuffer.wrap(messageDigest.digest()));
- } catch (TException e) {
- // retry
- logger.error("TException happened! ", e);
- continue;
- }
- break;
- }
+ TSStatus status = serviceClient.sendFile(metaInfo, buffToSend);
- if (status.code == TSStatusCode.SYNC_FILE_REBASE.getStatusCode()) {
- position = Long.parseLong(status.message);
- continue outer;
- } else if (status.code ==
TSStatusCode.SYNC_FILE_RETRY.getStatusCode()) {
- logger.info(
- "Receiver failed to receive data from {} because {}, retry.",
- file.getAbsoluteFile(),
- status.message);
- continue outer;
- } else if (status.code !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- logger.info(
- "Receiver failed to receive data from {} because {}, abort.",
- file.getAbsoluteFile(),
- status.message);
- throw new SyncConnectionException(status.message);
- } else { // Success
- position += dataLength;
- if (position >= limit) {
- break;
- }
- }
+ if ((status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode())) {
+ // Success
+ position += dataLength;
+ } else if (status.code ==
TSStatusCode.SYNC_FILE_REBASE.getStatusCode()) {
+ position = Long.parseLong(status.message);
+ } else if (status.code ==
TSStatusCode.SYNC_FILE_ERROR.getStatusCode()) {
+ logger.error(
+ "Receiver failed to receive data from {} because {}, abort.",
+ file.getAbsoluteFile(),
+ status.message);
+ return false;
}
- } catch (IOException e) {
- // retry
- logger.error("IOException happened! ", e);
- } catch (SyncConnectionException e) {
- logger.error("Cannot sync data with receiver. ", e);
- throw e;
}
+ } catch (TException e) {
+ logger.error("Cannot sync data with receiver. ", e);
+ throw new SyncConnectionException(e);
}
+ return true;
}
private long getFileSizeLimit(File file) {
@@ -327,142 +230,35 @@ public class IoTDBSinkTransportClient implements
ITransportClient {
return file.length();
}
- private boolean checkFileDigest(File file, MessageDigest messageDigest)
- throws SyncConnectionException, IOException {
- messageDigest.reset();
- try (InputStream inputStream = new FileInputStream(file)) {
- byte[] block = new byte[DATA_CHUNK_SIZE];
- int length;
- while ((length = inputStream.read(block)) > 0) {
- messageDigest.update(block, 0, length);
- }
- }
-
- TSyncTransportMetaInfo metaInfo =
- new TSyncTransportMetaInfo(TSyncTransportType.FILE, file.getName(), 0);
-
- TSStatus status;
- int retryCount = 0;
-
- while (true) {
- retryCount++;
- if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
- throw new SyncConnectionException(
- String.format(
- "Can not sync file %s after %s tries.",
- file.getAbsoluteFile(), config.getMaxNumberOfSyncFileRetry()));
- }
- try {
- status =
- serviceClient
- .getClient()
- .checkFileDigest(metaInfo,
ByteBuffer.wrap(messageDigest.digest()));
- } catch (TException e) {
- // retry
- logger.error("TException happens! ", e);
- continue;
- }
- break;
- }
-
- if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- logger.error("Digest check of tsfile {} failed, retry",
file.getAbsoluteFile());
- return false;
- }
- return true;
- }
-
- private void transportPipeData(PipeData pipeData)
- throws SyncConnectionException, NoSuchAlgorithmException {
-
- MessageDigest messageDigest = MessageDigest.getInstance("SHA-256");
-
- int retryCount = 0;
- while (true) {
-
- retryCount++;
- if (retryCount > config.getMaxNumberOfSyncFileRetry()) {
- throw new SyncConnectionException(
- String.format(
- "Can not sync pipe data after %s tries.",
config.getMaxNumberOfSyncFileRetry()));
- }
-
- try {
- byte[] buffer = pipeData.serialize();
- messageDigest.reset();
- messageDigest.update(buffer);
- ByteBuffer buffToSend = ByteBuffer.wrap(buffer);
-
- TSyncTransportMetaInfo metaInfo =
- new TSyncTransportMetaInfo(
- TSyncTransportType.findByValue(pipeData.getType().ordinal()),
"fileName", 0);
- TSStatus status =
- serviceClient
- .getClient()
- .transportData(metaInfo, buffToSend,
ByteBuffer.wrap(messageDigest.digest()));
-
- if (status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- break;
- } else {
- logger.error("Digest check of pipeData failed, retry");
- }
- } catch (IOException | TException e) {
- // retry
- logger.error("Exception happened!", e);
- }
- }
- }
-
/**
- * When an object implementing interface <code>Runnable</code> is used to
create a thread,
- * starting the thread causes the object's <code>run</code> method to be
called in that separately
- * executing thread.
- *
- * <p>The general contract of the method <code>run</code> is that it may
take any action
- * whatsoever.
+ * Transport and load PipeData
*
- * @see Thread#run()
+ * @return true if success; false if failed.
+ * @throws SyncConnectionException Connection exception, wait for a while
and try again
+ * @throws IOException Serialize error.
*/
- @Override
- public void run() {
+ private boolean transportPipeData(PipeData pipeData) throws
SyncConnectionException, IOException {
try {
- while (!Thread.currentThread().isInterrupted()) {
- try {
- if (!handshake()) {
- SyncService.getInstance()
- .receiveMsg(
- PipeMessage.MsgType.ERROR,
- String.format("Can not handshake with %s:%d.", ipAddress,
port));
- }
- while (!Thread.currentThread().isInterrupted()) {
- PipeData pipeData = pipe.take();
- if (!senderTransport(pipeData)) {
- logger.error(String.format("Can not transfer pipedata %s, skip
it.", pipeData));
- // can do something.
- SyncService.getInstance()
- .receiveMsg(
- PipeMessage.MsgType.WARN,
- String.format(
- "Transfer piepdata %s error, skip it.",
pipeData.getSerialNumber()));
- continue;
- }
- pipe.commit();
- }
- } catch (SyncConnectionException e) {
- logger.error(
- String.format("Connect to receiver %s:%d error, because %s.",
ipAddress, port, e));
- // TODO: wait and retry
- }
+ byte[] buffer = pipeData.serialize();
+ ByteBuffer buffToSend = ByteBuffer.wrap(buffer);
+ TSStatus status = serviceClient.sendPipeData(buffToSend);
+ if (status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ logger.info("Transport PipeData {} Successfully", pipeData);
+ } else if (status.code == TSStatusCode.PIPESERVER_ERROR.getStatusCode())
{
+ logger.error("Receiver failed to load PipeData {}, skip it.",
pipeData);
+ return false;
}
- } catch (InterruptedException e) {
- logger.info("Interrupted by pipe, exit transport.");
- } finally {
- close();
+ } catch (TException e) {
+ throw new SyncConnectionException(e);
}
+ return true;
}
+ @Override
public void close() {
- serviceClient.close();
- heartbeatClient.close();
+ if (transport != null) {
+ transport.close();
+ transport = null;
+ }
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/SenderManager.java
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/SenderManager.java
new file mode 100644
index 0000000000..cf323c04fd
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/SenderManager.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.sync.transport.client;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.sync.SyncConstant;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.exception.SyncConnectionException;
+import org.apache.iotdb.db.sync.SyncService;
+import org.apache.iotdb.db.sync.pipedata.PipeData;
+import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
+import org.apache.iotdb.db.sync.sender.pipe.Pipe;
+import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
+import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public class SenderManager {
+ private static final Logger logger =
LoggerFactory.getLogger(SenderManager.class);
+
+ protected ITransportClient transportClient;
+ private Pipe pipe;
+ private PipeSink pipeSink;
+
+ protected ExecutorService transportExecutorService;
+ private Future transportFuture;
+
+ public SenderManager(Pipe pipe, IoTDBPipeSink pipeSink) {
+ this.pipe = pipe;
+ this.pipeSink = pipeSink;
+ this.transportExecutorService =
+ IoTDBThreadPoolFactory.newSingleThreadExecutor(
+ ThreadName.SYNC_SENDER_PIPE.getName() + "-" + pipe.getName());
+ this.transportClient = TransportClientFactory.createTransportClient(pipe,
pipeSink);
+ }
+
+ public void start() {
+ transportFuture =
transportExecutorService.submit(this::takePipeDataAndTransport);
+ }
+
+ public void stop() {
+ if (transportFuture != null) {
+ transportFuture.cancel(true);
+ }
+ }
+
+ public boolean close() throws InterruptedException {
+ boolean isClosed;
+ transportExecutorService.shutdownNow();
+ isClosed =
+ transportExecutorService.awaitTermination(
+ SyncConstant.DEFAULT_WAITING_FOR_STOP_MILLISECONDS,
TimeUnit.MILLISECONDS);
+ return isClosed;
+ }
+
+ private void takePipeDataAndTransport() {
+ try {
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ if (!transportClient.handshake()) {
+ SyncService.getInstance()
+ .receiveMsg(
+ PipeMessage.MsgType.ERROR,
+ String.format("Can not handshake with %s", pipeSink));
+ }
+ while (!Thread.currentThread().isInterrupted()) {
+ PipeData pipeData = pipe.take();
+ if (!transportClient.send(pipeData)) {
+ logger.error(String.format("Can not transfer pipedata %s, skip
it.", pipeData));
+ // can do something.
+ SyncService.getInstance()
+ .receiveMsg(
+ PipeMessage.MsgType.WARN,
+ String.format(
+ "Transfer piepdata %s error, skip it.",
pipeData.getSerialNumber()));
+ continue;
+ }
+ pipe.commit();
+ }
+ } catch (SyncConnectionException e) {
+ logger.error(String.format("Connect to receiver %s error, because
%s.", pipeSink, e));
+ // TODO: wait and retry
+ }
+ }
+ } catch (InterruptedException e) {
+ logger.info("Interrupted by pipe, exit transport.");
+ } finally {
+ transportClient.close();
+ }
+ }
+
+ /** test */
+ @TestOnly
+ public void setTransportClient(ITransportClient transportClient) {
+ this.transportClient = transportClient;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClientFactory.java
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClientFactory.java
new file mode 100644
index 0000000000..cf176bb170
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClientFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.transport.client;
+
+import org.apache.iotdb.commons.sync.SyncConstant;
+import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
+import org.apache.iotdb.db.sync.sender.pipe.Pipe;
+import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+
+public class TransportClientFactory {
+
+ private static final Logger logger =
LoggerFactory.getLogger(TransportClientFactory.class);
+
+ public static ITransportClient createTransportClient(Pipe pipe, PipeSink
pipeSink) {
+ switch (pipeSink.getType()) {
+ case IoTDB:
+ IoTDBPipeSink ioTDBPipeSink = (IoTDBPipeSink) pipeSink;
+ return new IoTDBSinkTransportClient(
+ pipe, ioTDBPipeSink.getIp(), ioTDBPipeSink.getPort(),
getLocalIP(ioTDBPipeSink));
+ case ExternalPipe:
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static String getLocalIP(IoTDBPipeSink pipeSink) {
+ String localIP;
+ try {
+ InetAddress inetAddress = InetAddress.getLocalHost();
+ if (inetAddress.isLoopbackAddress()) {
+ try (final DatagramSocket socket = new DatagramSocket()) {
+ socket.connect(InetAddress.getByName(pipeSink.getIp()),
pipeSink.getPort());
+ localIP = socket.getLocalAddress().getHostAddress();
+ }
+ } else {
+ localIP = inetAddress.getHostAddress();
+ }
+ } catch (UnknownHostException | SocketException e) {
+ logger.error("Get local host error when create transport handler.", e);
+ localIP = SyncConstant.UNKNOWN_IP;
+ }
+ return localIP;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/transport/conf/TransportConfig.java
b/server/src/main/java/org/apache/iotdb/db/sync/transport/conf/TransportConfig.java
deleted file mode 100644
index 28a27aa491..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/sync/transport/conf/TransportConfig.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.sync.transport.conf;
-
-import org.apache.iotdb.commons.conf.IoTDBConstant;
-
-import java.io.File;
-
-public class TransportConfig {
- private TransportConfig() {}
-
- /** default base dir, stores all IoTDB runtime files */
- private static final String DEFAULT_BASE_DIR = addHomeDir("data");
-
- private static String addHomeDir(String dir) {
- String homeDir = System.getProperty(IoTDBConstant.IOTDB_HOME, null);
- if (!new File(dir).isAbsolute() && homeDir != null && homeDir.length() >
0) {
- if (!homeDir.endsWith(File.separator)) {
- dir = homeDir + File.separatorChar + dir;
- } else {
- dir = homeDir + dir;
- }
- }
- return dir;
- }
-
- public static boolean isCheckFileDegistAgain = false;
-}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
index 2a970a67b9..c62220b709 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
@@ -32,36 +32,22 @@ import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
-import org.apache.iotdb.service.rpc.thrift.TSyncTransportType;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedReader;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileWriter;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
import java.io.RandomAccessFile;
-import java.math.BigInteger;
import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.nio.file.StandardCopyOption;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
import java.text.DecimalFormat;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-import static org.apache.iotdb.commons.sync.SyncConstant.DATA_CHUNK_SIZE;
-
/**
* This class is responsible for implementing the RPC processing on the
receiver-side. It should
* only be accessed by the {@linkplain org.apache.iotdb.db.sync.SyncService}
@@ -70,13 +56,13 @@ public class ReceiverManager {
private static Logger logger =
LoggerFactory.getLogger(ReceiverManager.class);
private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private static final String RECORD_SUFFIX = ".record";
- private static final String PATCH_SUFFIX = ".patch";
// When the client abnormally exits, we can still know who to disconnect
private final ThreadLocal<Long> currentConnectionId;
// Record the remote message for every rpc connection
private final Map<Long, TSyncIdentityInfo> connectionIdToIdentityInfoMap;
+ // Record the remote message for every rpc connection
+ private final Map<Long, Map<String, Long>> connectionIdToStartIndexRecord;
// The sync connectionId is unique in one IoTDB instance.
private final AtomicLong connectionIdGenerator;
@@ -84,6 +70,7 @@ public class ReceiverManager {
public ReceiverManager() {
currentConnectionId = new ThreadLocal<>();
connectionIdToIdentityInfoMap = new ConcurrentHashMap<>();
+ connectionIdToStartIndexRecord = new ConcurrentHashMap<>();
connectionIdGenerator = new AtomicLong();
}
@@ -108,52 +95,50 @@ public class ReceiverManager {
}
private CheckResult checkStartIndexValid(File file, long startIndex) throws
IOException {
- File recordFile = new File(file.getAbsolutePath() + RECORD_SUFFIX);
-
- if (!recordFile.exists() && startIndex != 0) {
+ // get local index from memory map
+ long localIndex = getCurrentFileStartIndex(file.getAbsolutePath());
+ // get local index from file
+ if (localIndex < 0 && file.exists()) {
+ localIndex = file.length();
+ recordStartIndex(file, localIndex);
+ }
+ // compare and check
+ if (localIndex < 0 && startIndex != 0) {
logger.error(
"The start index {} of data sync is not valid. "
- + "The file {} is not exist and start index should equal to 0).",
- startIndex,
- recordFile.getAbsolutePath());
+ + "The file is not exist and start index should equal to 0).",
+ startIndex);
return new CheckResult(false, "0");
+ } else if (localIndex >= 0 && localIndex != startIndex) {
+ logger.error(
+ "The start index {} of data sync is not valid. "
+ + "The start index of the file should equal to {}.",
+ startIndex,
+ localIndex);
+ return new CheckResult(false, String.valueOf(localIndex));
}
+ return new CheckResult(true, "0");
+ }
- if (recordFile.exists()) {
- try (InputStream inputStream = new FileInputStream(recordFile);
- BufferedReader bufferedReader = new BufferedReader(new
InputStreamReader(inputStream))) {
- String index = bufferedReader.readLine();
-
- if ((index == null) || (index.length() == 0)) {
- if (startIndex != 0) {
- logger.error(
- "The start index {} of data sync is not valid. "
- + "The file {} is not exist and start index is should
equal to 0.",
- startIndex,
- recordFile.getAbsolutePath());
- return new CheckResult(false, "0");
- }
- }
-
- if (Long.parseLong(index) != startIndex) {
- logger.error(
- "The start index {} of data sync is not valid. "
- + "The start index of the file {} should equal to {}.",
- startIndex,
- recordFile.getAbsolutePath(),
- index);
- return new CheckResult(false, index);
- }
- }
+ private void recordStartIndex(File file, long position) {
+ Long id = currentConnectionId.get();
+ if (id != null) {
+ Map<String, Long> map =
+ connectionIdToStartIndexRecord.computeIfAbsent(id, i -> new
ConcurrentHashMap<>());
+ map.put(file.getAbsolutePath(), position);
}
-
- return new CheckResult(true, "0");
}
// endregion
// region Interfaces and Implementation of RPC Handler
+ /**
+ * Create connection from sender
+ *
+ * @return {@link TSStatusCode#PIPESERVER_ERROR} if fail to connect; {@link
+ * TSStatusCode#SUCCESS_STATUS} if success to connect.
+ */
public TSStatus handshake(TSyncIdentityInfo identityInfo) {
logger.debug("Invoke handshake method from client ip = {}",
identityInfo.address);
// check ip address
@@ -218,157 +203,114 @@ public class ReceiverManager {
return ipAddressBinary.equals(ipSegmentBinary);
}
- public TSStatus transportData(TSyncTransportMetaInfo metaInfo, ByteBuffer
buff, ByteBuffer digest)
- throws TException {
+ /**
+ * Receive {@link PipeData} and load it into IoTDB Engine.
+ *
+ * @return {@link TSStatusCode#PIPESERVER_ERROR} if fail to receive or load;
{@link
+ * TSStatusCode#SUCCESS_STATUS} if load successfully.
+ * @throws TException The connection between the sender and the receiver has
not been established
+ * by {@link ReceiverManager#handshake(TSyncIdentityInfo)}
+ */
+ public TSStatus transportPipeData(ByteBuffer buff) throws TException {
+ // step1. check connection
TSyncIdentityInfo identityInfo = getCurrentTSyncIdentityInfo();
if (identityInfo == null) {
throw new TException("Thrift connection is not alive.");
}
- logger.debug("Invoke transportData method from client ip = {}",
identityInfo.address);
-
+ logger.debug("Invoke transportPipeData method from client ip = {}",
identityInfo.address);
String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo);
- TSyncTransportType type = metaInfo.type;
- String fileName = metaInfo.fileName;
- long startIndex = metaInfo.startIndex;
- // Check file start index valid
- if (type == TSyncTransportType.FILE) {
- try {
- CheckResult result = checkStartIndexValid(new File(fileDir, fileName),
startIndex);
- if (!result.isResult()) {
- return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REBASE,
result.getIndex());
- }
- } catch (IOException e) {
- logger.error(e.getMessage());
- return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR,
e.getMessage());
- }
- }
-
- // Check buff digest
- int pos = buff.position();
- MessageDigest messageDigest = null;
+ // step2. deserialize PipeData
+ PipeData pipeData;
try {
- messageDigest = MessageDigest.getInstance("SHA-256");
- } catch (NoSuchAlgorithmException e) {
- logger.error(e.getMessage());
- return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
- }
- messageDigest.update(buff);
- byte[] digestBytes = new byte[digest.capacity()];
- digest.get(digestBytes);
- if (!Arrays.equals(messageDigest.digest(), digestBytes)) {
- return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_RETRY, "Data digest
check error");
- }
-
- if (type != TSyncTransportType.FILE) {
- buff.position(pos);
int length = buff.capacity();
byte[] byteArray = new byte[length];
buff.get(byteArray);
- try {
- PipeData pipeData = PipeData.createPipeData(byteArray);
- if (type == TSyncTransportType.TSFILE) {
- // Do with file
- handleTsFilePipeData((TsFilePipeData) pipeData, fileDir);
- }
- logger.info(
- "Start load pipeData with serialize number {} and type
{},value={}",
- pipeData.getSerialNumber(),
- pipeData.getType(),
- pipeData);
- pipeData.createLoader().load();
- logger.info(
- "Load pipeData with serialize number {} successfully.",
pipeData.getSerialNumber());
- } catch (IOException | IllegalPathException e) {
- logger.error("Pipe data transport error, {}", e.getMessage());
- return RpcUtils.getStatus(
- TSStatusCode.SYNC_FILE_RETRY, "Data digest transport error " +
e.getMessage());
- } catch (PipeDataLoadException e) {
- logger.error("Fail to load pipeData because {}.", e.getMessage());
- return RpcUtils.getStatus(
- TSStatusCode.SYNC_FILE_ERROR, "Fail to load pipeData because " +
e.getMessage());
- }
- } else {
- // Write buff to {file}.patch
- buff.position(pos);
- File file = new File(fileDir, fileName + PATCH_SUFFIX);
- try (RandomAccessFile randomAccessFile = new RandomAccessFile(file,
"rw")) {
- randomAccessFile.seek(startIndex);
- int length = buff.capacity();
- byte[] byteArray = new byte[length];
- buff.get(byteArray);
- randomAccessFile.write(byteArray);
- writeRecordFile(new File(fileDir, fileName + RECORD_SUFFIX),
startIndex + length);
- logger.debug(
- "Sync "
- + fileName
- + " start at "
- + startIndex
- + " to "
- + (startIndex + length)
- + " is done.");
- } catch (IOException e) {
- logger.error(e.getMessage());
- return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR,
e.getMessage());
+ pipeData = PipeData.createPipeData(byteArray);
+ if (pipeData instanceof TsFilePipeData) {
+ handleTsFilePipeData((TsFilePipeData) pipeData, fileDir);
}
+ } catch (IOException | IllegalPathException e) {
+ logger.error("Pipe data transport error, {}", e.getMessage());
+ return RpcUtils.getStatus(
+ TSStatusCode.PIPESERVER_ERROR, "Pipe data transport error, " +
e.getMessage());
}
+
+ // step3. load PipeData
+ logger.info(
+ "Start load pipeData with serialize number {} and type {},value={}",
+ pipeData.getSerialNumber(),
+ pipeData.getType(),
+ pipeData);
+ try {
+ pipeData.createLoader().load();
+ logger.info(
+ "Load pipeData with serialize number {} successfully.",
pipeData.getSerialNumber());
+ } catch (PipeDataLoadException e) {
+ logger.error("Fail to load pipeData because {}.", e.getMessage());
+ return RpcUtils.getStatus(
+ TSStatusCode.PIPESERVER_ERROR, "Fail to load pipeData because " +
e.getMessage());
+ }
+
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
}
- public TSStatus checkFileDigest(TSyncTransportMetaInfo metaInfo, ByteBuffer
digest)
+ /**
+ * Receive TsFile based on startIndex.
+ *
+ * @return {@link TSStatusCode#SUCCESS_STATUS} if receive successfully;
{@link
+ * TSStatusCode#SYNC_FILE_REBASE} if startIndex needs to rollback
because mismatched; {@link
+ * TSStatusCode#SYNC_FILE_ERROR} if fail to receive file.
+ * @throws TException The connection between the sender and the receiver has
not been established
+ * by {@link ReceiverManager#handshake(TSyncIdentityInfo)}
+ */
+ public TSStatus transportFile(TSyncTransportMetaInfo metaInfo, ByteBuffer
buff)
throws TException {
+ // step1. check connection
TSyncIdentityInfo identityInfo = getCurrentTSyncIdentityInfo();
if (identityInfo == null) {
throw new TException("Thrift connection is not alive.");
}
- logger.debug("Invoke checkFileDigest method from client ip = {}",
identityInfo.address);
+ logger.debug("Invoke transportData method from client ip = {}",
identityInfo.address);
+
String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo);
- synchronized (fileDir.intern()) {
- String fileName = metaInfo.fileName;
- MessageDigest messageDigest = null;
- try {
- messageDigest = MessageDigest.getInstance("SHA-256");
- } catch (NoSuchAlgorithmException e) {
- logger.error(e.getMessage());
- return RpcUtils.getStatus(TSStatusCode.PIPESERVER_ERROR,
e.getMessage());
- }
+ String fileName = metaInfo.fileName;
+ long startIndex = metaInfo.startIndex;
+ File file = new File(fileDir, fileName + SyncConstant.PATCH_SUFFIX);
- try (InputStream inputStream =
- new FileInputStream(new File(fileDir, fileName + PATCH_SUFFIX))) {
- byte[] block = new byte[DATA_CHUNK_SIZE];
- int length;
- while ((length = inputStream.read(block)) > 0) {
- messageDigest.update(block, 0, length);
- }
-
- String localDigest = (new BigInteger(1,
messageDigest.digest())).toString(16);
- byte[] digestBytes = new byte[digest.capacity()];
- digest.get(digestBytes);
- if (!Arrays.equals(messageDigest.digest(), digestBytes)) {
- logger.error(
- "The file {} digest check error. "
- + "The local digest is {} (should be equal to {}).",
- fileName,
- localDigest,
- digest);
- new File(fileDir, fileName + RECORD_SUFFIX).delete();
- return RpcUtils.getStatus(TSStatusCode.PIPESERVER_ERROR, "File
digest check error.");
- }
- } catch (IOException e) {
- logger.error(e.getMessage());
- return RpcUtils.getStatus(TSStatusCode.PIPESERVER_ERROR,
e.getMessage());
+ // step2. check startIndex
+ try {
+ CheckResult result = checkStartIndexValid(new File(fileDir, fileName),
startIndex);
+ if (!result.isResult()) {
+ return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REBASE,
result.getIndex());
}
+ } catch (IOException e) {
+ logger.error(e.getMessage());
+ return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
+ }
- return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
+ // step3. append file
+ try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw"))
{
+ int length = buff.capacity();
+ randomAccessFile.seek(startIndex);
+ byte[] byteArray = new byte[length];
+ buff.get(byteArray);
+ randomAccessFile.write(byteArray);
+ recordStartIndex(new File(fileDir, fileName), startIndex + length);
+ logger.debug(
+ "Sync "
+ + fileName
+ + " start at "
+ + startIndex
+ + " to "
+ + (startIndex + length)
+ + " is done.");
+ } catch (IOException e) {
+ logger.error(e.getMessage());
+ return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
}
- }
- private void writeRecordFile(File recordFile, long position) throws
IOException {
- File tmpFile = new File(recordFile.getAbsolutePath() + ".tmp");
- FileWriter fileWriter = new FileWriter(tmpFile, false);
- fileWriter.write(String.valueOf(position));
- fileWriter.close();
- Files.move(tmpFile.toPath(), recordFile.toPath(),
StandardCopyOption.REPLACE_EXISTING);
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
}
/**
@@ -382,7 +324,9 @@ public class ReceiverManager {
String tsFileName = tsFilePipeData.getTsFileName();
File dir = new File(fileDir);
File[] targetFiles =
- dir.listFiles((dir1, name) -> name.startsWith(tsFileName) &&
name.endsWith(PATCH_SUFFIX));
+ dir.listFiles(
+ (dir1, name) ->
+ name.startsWith(tsFileName) &&
name.endsWith(SyncConstant.PATCH_SUFFIX));
if (targetFiles != null) {
for (File targetFile : targetFiles) {
File newFile =
@@ -390,18 +334,12 @@ public class ReceiverManager {
dir,
targetFile
.getName()
- .substring(0, targetFile.getName().length() -
PATCH_SUFFIX.length()));
+ .substring(
+ 0, targetFile.getName().length() -
SyncConstant.PATCH_SUFFIX.length()));
targetFile.renameTo(newFile);
}
}
tsFilePipeData.setParentDirPath(dir.getAbsolutePath());
- File recordFile = new File(fileDir, tsFileName + RECORD_SUFFIX);
- try {
- Files.deleteIfExists(recordFile.toPath());
- } catch (IOException e) {
- logger.warn(
- String.format("Delete record file %s error, because %s.",
recordFile.getPath(), e));
- }
}
// endregion
@@ -427,6 +365,22 @@ public class ReceiverManager {
}
}
+ /**
+ * Get current TSyncIdentityInfo
+ *
+ * @return startIndex of file: -1 if file doesn't exist
+ */
+ private long getCurrentFileStartIndex(String absolutePath) {
+ Long id = currentConnectionId.get();
+ if (id != null) {
+ Map<String, Long> map = connectionIdToStartIndexRecord.get(id);
+ if (map != null && map.containsKey(absolutePath)) {
+ return map.get(absolutePath);
+ }
+ }
+ return -1;
+ }
+
private void createConnection(TSyncIdentityInfo identityInfo) {
long connectionId = connectionIdGenerator.incrementAndGet();
currentConnectionId.set(connectionId);
@@ -440,6 +394,7 @@ public class ReceiverManager {
if (checkConnection()) {
long id = currentConnectionId.get();
connectionIdToIdentityInfoMap.remove(id);
+ connectionIdToIdentityInfoMap.remove(id);
currentConnectionId.remove();
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java
b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java
index fc68a92324..6ea46e968d 100644
---
a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.sync.common.SyncInfo;
+import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.junit.After;
diff --git
a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
index 0d53ecf0dc..26919b948e 100644
---
a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
@@ -25,9 +25,9 @@ import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.sync.common.persistence.SyncLogReader;
import org.apache.iotdb.db.sync.common.persistence.SyncLogWriter;
-import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
import org.apache.iotdb.db.sync.sender.pipe.Pipe.PipeStatus;
import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
+import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
import org.apache.iotdb.db.utils.EnvironmentUtils;
diff --git
a/server/src/test/java/org/apache/iotdb/db/sync/transport/SyncTransportTest.java
b/server/src/test/java/org/apache/iotdb/db/sync/transport/SyncTransportTest.java
index afb932cf27..d84a2933e6 100644
---
a/server/src/test/java/org/apache/iotdb/db/sync/transport/SyncTransportTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/sync/transport/SyncTransportTest.java
@@ -19,8 +19,11 @@
*/
package org.apache.iotdb.db.sync.transport;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.sync.SyncConstant;
import org.apache.iotdb.commons.sync.SyncPathUtil;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
@@ -33,6 +36,12 @@ import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
import org.apache.iotdb.db.sync.transport.client.IoTDBSinkTransportClient;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.session.util.Version;
@@ -42,17 +51,27 @@ import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.commons.io.FileUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class SyncTransportTest {
+
+ private static IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
/** create tsfile and move to tmpDir for sync test */
File tmpDir = new File("target/synctest");
@@ -61,6 +80,10 @@ public class SyncTransportTest {
long createdTime1 = System.currentTimeMillis();
File fileDir;
+ File tsfile;
+ File resourceFile;
+ File modsFile;
+
@Before
public void setUp() throws Exception {
EnvironmentUtils.envSetUp();
@@ -83,6 +106,19 @@ public class SyncTransportTest {
FileUtils.deleteDirectory(tmpDir);
}
FileUtils.moveDirectory(srcDir, tmpDir);
+ tsfile = null;
+ resourceFile = null;
+ modsFile = null;
+ File[] fileList = tmpDir.listFiles();
+ for (File f : fileList) {
+ if (f.getName().endsWith(".tsfile")) {
+ tsfile = f;
+ } else if (f.getName().endsWith(".mods")) {
+ modsFile = f;
+ } else if (f.getName().endsWith(".resource")) {
+ resourceFile = f;
+ }
+ }
EnvironmentUtils.cleanEnv();
EnvironmentUtils.envSetUp();
}
@@ -94,21 +130,130 @@ public class SyncTransportTest {
}
@Test
- public void test() throws Exception {
- // 1. prepare fake file
- File[] fileList = tmpDir.listFiles();
- File tsfile = null;
- File resourceFile = null;
- File modsFile = null;
- for (File f : fileList) {
- if (f.getName().endsWith(".tsfile")) {
- tsfile = f;
- } else if (f.getName().endsWith(".mods")) {
- modsFile = f;
- } else if (f.getName().endsWith(".resource")) {
- resourceFile = f;
+ public void testTransportFile() throws Exception {
+ TSyncIdentityInfo identityInfo =
+ new TSyncIdentityInfo("127.0.0.1", pipeName1, createdTime1,
config.getIoTDBVersion());
+ try (TTransport transport =
+ RpcTransportFactory.INSTANCE.getTransport(
+ new TSocket(
+ TConfigurationConst.defaultTConfiguration,
+ "127.0.0.1",
+ 6667,
+ SyncConstant.SOCKET_TIMEOUT_MILLISECONDS,
+ SyncConstant.CONNECT_TIMEOUT_MILLISECONDS))) {
+ TProtocol protocol;
+ if (config.isRpcThriftCompressionEnable()) {
+ protocol = new TCompactProtocol(transport);
+ } else {
+ protocol = new TBinaryProtocol(transport);
+ }
+ IClientRPCService.Client serviceClient = new
IClientRPCService.Client(protocol);
+ // Underlay socket open.
+ if (!transport.isOpen()) {
+ transport.open();
+ }
+ byte[] buffer = new byte[10];
+ try (RandomAccessFile randomAccessFile = new RandomAccessFile(tsfile,
"rw")) {
+ // no handshake, response TException
+ try {
+ serviceClient.sendFile(
+ new TSyncTransportMetaInfo(tsfile.getName(), 0),
ByteBuffer.wrap(buffer));
+ Assert.fail();
+ } catch (TException e) {
+ // do nothing
+ }
+ serviceClient.handshake(identityInfo);
+ // response REBASE:0
+ randomAccessFile.read(buffer, 0, 10);
+ TSStatus tsStatus1 =
+ serviceClient.sendFile(
+ new TSyncTransportMetaInfo(tsfile.getName(), 1),
ByteBuffer.wrap(buffer));
+ Assert.assertEquals(tsStatus1.getCode(),
TSStatusCode.SYNC_FILE_REBASE.getStatusCode());
+ Assert.assertEquals(tsStatus1.getMessage(), "0");
+ // response SUCCESS
+ TSStatus tsStatus2 =
+ serviceClient.sendFile(
+ new TSyncTransportMetaInfo(tsfile.getName(), 0),
ByteBuffer.wrap(buffer));
+ Assert.assertEquals(tsStatus2.getCode(),
TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ // response response REBASE:10
+ TSStatus tsStatus3 =
+ serviceClient.sendFile(
+ new TSyncTransportMetaInfo(tsfile.getName(), 0),
ByteBuffer.wrap(buffer));
+ Assert.assertEquals(tsStatus3.getCode(),
TSStatusCode.SYNC_FILE_REBASE.getStatusCode());
+ Assert.assertEquals(tsStatus3.getMessage(), "10");
+ TSStatus tsStatus4 =
+ serviceClient.sendFile(
+ new TSyncTransportMetaInfo(tsfile.getName(), 100),
ByteBuffer.wrap(buffer));
+ Assert.assertEquals(tsStatus4.getCode(),
TSStatusCode.SYNC_FILE_REBASE.getStatusCode());
+ Assert.assertEquals(tsStatus4.getMessage(), "10");
+ // response SUCCESS
+ byte[] remainBuffer = new byte[(int) (randomAccessFile.length() - 10)];
+ randomAccessFile.read(remainBuffer, 0, (int)
(randomAccessFile.length() - 10));
+ TSStatus tsStatus5 =
+ serviceClient.sendFile(
+ new TSyncTransportMetaInfo(tsfile.getName(), 10),
ByteBuffer.wrap(remainBuffer));
+ Assert.assertEquals(tsStatus5.getCode(),
TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
}
+ // check completeness of file
+ File receiveFile =
+ new File(
+ SyncPathUtil.getFileDataDirPath(identityInfo),
+ tsfile.getName() + SyncConstant.PATCH_SUFFIX);
+ Assert.assertTrue(receiveFile.exists());
+
+ try (RandomAccessFile originFileRAF = new RandomAccessFile(tsfile, "r");
+ RandomAccessFile receiveFileRAF = new RandomAccessFile(receiveFile,
"r")) {
+ Assert.assertEquals(originFileRAF.length(), receiveFileRAF.length());
+ byte[] buffer1 = new byte[(int) originFileRAF.length()];
+ byte[] buffer2 = new byte[(int) receiveFile.length()];
+ originFileRAF.read(buffer1);
+ receiveFileRAF.read(buffer2);
+ Assert.assertArrayEquals(buffer1, buffer2);
+ }
+ }
+
+ @Test
+ public void testTransportPipeData() throws Exception {
+ try (TTransport transport =
+ RpcTransportFactory.INSTANCE.getTransport(
+ new TSocket(
+ TConfigurationConst.defaultTConfiguration,
+ "127.0.0.1",
+ 6667,
+ SyncConstant.SOCKET_TIMEOUT_MILLISECONDS,
+ SyncConstant.CONNECT_TIMEOUT_MILLISECONDS))) {
+ TProtocol protocol;
+ if (config.isRpcThriftCompressionEnable()) {
+ protocol = new TCompactProtocol(transport);
+ } else {
+ protocol = new TBinaryProtocol(transport);
+ }
+ IClientRPCService.Client serviceClient = new
IClientRPCService.Client(protocol);
+ // Underlay socket open.
+ if (!transport.isOpen()) {
+ transport.open();
+ }
+ PipeData pipeData =
+ new SchemaPipeData(new SetStorageGroupPlan(new
PartialPath("root.sg1")), 0);
+ byte[] buffer = pipeData.serialize();
+ ByteBuffer buffToSend = ByteBuffer.wrap(buffer);
+ try {
+ TSStatus tsStatus = serviceClient.sendPipeData(buffToSend);
+ Assert.fail();
+ } catch (TException e) {
+ // do nothing
+ }
+ serviceClient.handshake(
+ new TSyncIdentityInfo("127.0.0.1", pipeName1, createdTime1,
config.getIoTDBVersion()));
+ TSStatus tsStatus = serviceClient.sendPipeData(buffToSend);
+ Assert.assertEquals(tsStatus.getCode(),
TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+ }
+
+ @Test
+ public void testTransportClient() throws Exception {
+ // 1. prepare fake file
Assert.assertNotNull(tsfile);
Assert.assertNotNull(modsFile);
Assert.assertNotNull(resourceFile);
@@ -136,7 +281,7 @@ public class SyncTransportTest {
pipe, "127.0.0.1",
IoTDBDescriptor.getInstance().getConfig().getRpcPort(), "127.0.0.1");
client.handshake();
for (PipeData pipeData : pipeDataList) {
- client.senderTransport(pipeData);
+ client.send(pipeData);
}
// 4. check result
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 2eb1ef7a52..b7d095e5c7 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -71,8 +71,7 @@ public enum TSStatusCode {
TEMPLATE_NOT_EXIST(339),
CREATE_TEMPLATE_ERROR(340),
SYNC_FILE_REBASE(341),
- SYNC_FILE_RETRY(342),
- SYNC_FILE_ERROR(343),
+ SYNC_FILE_ERROR(342),
EXECUTE_STATEMENT_ERROR(400),
SQL_PARSE_ERROR(401),
diff --git a/thrift/src/main/thrift/client.thrift
b/thrift/src/main/thrift/client.thrift
index 014bda77dd..7405beae93 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -420,20 +420,11 @@ struct TSyncIdentityInfo{
4:required string version
}
-enum TSyncTransportType {
- TSFILE,
- DELETION,
- PHYSICALPLAN,
- FILE
-}
-
struct TSyncTransportMetaInfo{
- // The type of the pipeData in sending.
- 1:required TSyncTransportType type
// The name of the file in sending.
- 2:required string fileName
+ 1:required string fileName
// The start index of the file slice in sending.
- 3:required i64 startIndex
+ 2:required i64 startIndex
}
service IClientRPCService {
@@ -529,7 +520,7 @@ service IClientRPCService {
common.TSStatus handshake(TSyncIdentityInfo info);
- common.TSStatus transportData(1:TSyncTransportMetaInfo metaInfo, 2:binary
buff, 3:binary digest);
+ common.TSStatus sendPipeData(1:binary buff);
- common.TSStatus checkFileDigest(1:TSyncTransportMetaInfo metaInfo, 2:binary
digest);
+ common.TSStatus sendFile(1:TSyncTransportMetaInfo metaInfo, 2:binary buff);
}
\ No newline at end of file