This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e161f2528b9 [IOTDB-6001] Pipe: a non-blocking iotdb connector
(iotdb_thrift_connector_v2) (#10174)
e161f2528b9 is described below
commit e161f2528b9208868cee692a58b34596756a82dd
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Jun 16 10:23:54 2023 +0800
[IOTDB-6001] Pipe: a non-blocking iotdb connector
(iotdb_thrift_connector_v2) (#10174)
---
.../runtime/PipeHandleMetaChangeProcedure.java | 2 +-
.../apache/iotdb/pipe/api/access/RowIterator.java | 75 -----
.../iotdb/commons/client/ClientPoolFactory.java | 24 ++
.../async/AsyncPipeDataTransferServiceClient.java | 167 +++++++++++
.../pipe/plugin/builtin/BuiltinPipePlugin.java | 4 +
.../builtin/connector/IoTDBThriftConnector.java | 16 +-
.../builtin/connector/IoTDBThriftConnectorV1.java | 17 +-
.../builtin/connector/IoTDBThriftConnectorV2.java | 17 +-
.../pipe/agent/receiver/IoTDBThriftReceiver.java | 6 +-
.../db/pipe/agent/receiver/PipeReceiverAgent.java | 10 +-
.../config/constant/PipeConnectorConstant.java | 2 +
...ava => IoTDBThriftConnectorRequestVersion.java} | 7 +-
...orImplV1_1.java => IoTDBSyncConnectorV1_1.java} | 12 +-
.../{ => v1}/IoTDBThriftConnectorClient.java | 2 +-
.../pipe/connector/v1/IoTDBThriftConnectorV1.java | 5 +-
.../pipe/connector/v1/IoTDBThriftReceiverV1.java | 8 +-
.../v1/request/PipeTransferFilePieceReq.java | 4 +-
.../v1/request/PipeTransferFileSealReq.java | 4 +-
.../v1/request/PipeTransferHandshakeReq.java | 4 +-
.../v1/request/PipeTransferInsertNodeReq.java | 4 +-
.../v1/request/PipeTransferTabletReq.java | 6 +-
.../pipe/connector/v2/IoTDBThriftConnectorV2.java | 312 +++++++++++++++++++++
...nsferInsertNodeTabletInsertionEventHandler.java | 51 ++++
...ipeTransferRawTabletInsertionEventHandler.java} | 29 +-
.../PipeTransferTabletInsertionEventHandler.java | 131 +++++++++
.../PipeTransferTsFileInsertionEventHandler.java | 222 +++++++++++++++
.../apache/iotdb/db/pipe/event/EnrichedEvent.java | 8 +-
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 4 +-
.../common/tsfile/PipeTsFileInsertionEvent.java | 4 +-
.../event/realtime/PipeRealtimeCollectEvent.java | 8 +-
.../task/subtask/PipeConnectorSubtaskManager.java | 20 +-
.../service/thrift/impl/ClientRPCServiceImpl.java | 2 +-
32 files changed, 1003 insertions(+), 184 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 31ebb405907..435b3e0b41f 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -106,7 +106,7 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
final PipeMeta pipeMetaFromDataNode =
pipeMetaMapFromDataNode.get(pipeMetaOnConfigNode.getStaticMeta());
if (pipeMetaFromDataNode == null) {
- LOGGER.warn(
+ LOGGER.info(
"PipeRuntimeCoordinator meets error in updating pipeMetaKeeper, "
+ "pipeMetaFromDataNode is null, pipeMetaOnConfigNode: {}",
pipeMetaOnConfigNode);
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/RowIterator.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/RowIterator.java
deleted file mode 100644
index ae9849eb3cc..00000000000
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/RowIterator.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.pipe.api.access;
-
-import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
-import org.apache.iotdb.pipe.api.type.Type;
-import org.apache.iotdb.tsfile.read.common.Path;
-
-import java.io.IOException;
-import java.util.List;
-
-public interface RowIterator {
-
- /**
- * Returns {@code true} if the iteration has more rows.
- *
- * @return {@code true} if the iteration has more rows
- */
- boolean hasNextRow();
-
- /**
- * Returns the next row in the iteration.
- *
- * <p>Note that the Row instance returned by this method each time is the
same instance. In other
- * words, calling {@code next()} will only change the member variables
inside the Row instance,
- * but will not generate a new Row instance.
- *
- * @return the next element in the iteration
- * @throws IOException if any I/O errors occur
- */
- Row next() throws IOException;
-
- /** Resets the iteration. */
- void reset();
-
- /**
- * Returns the actual column index of the given column name.
- *
- * @param columnName the column name in Path form
- * @throws PipeParameterNotValidException if the given column name is not
existed
- * @return the actual column index of the given column name
- */
- int getColumnIndex(Path columnName) throws PipeParameterNotValidException;
-
- /**
- * Returns the column names
- *
- * @return the column names
- */
- List<Path> getColumnNames();
-
- /**
- * Returns the column data types
- *
- * @return the column data types
- */
- List<Type> getColumnTypes();
-}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index b156ec47851..972ffeea774 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.async.AsyncConfigNodeIServiceClient;
import
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import
org.apache.iotdb.commons.client.async.AsyncDataNodeMPPDataExchangeServiceClient;
+import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.client.property.ClientPoolProperty;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
@@ -220,4 +221,27 @@ public class ClientPoolFactory {
.getConfig());
}
}
+
+ public static class AsyncPipeDataTransferServiceClientPoolFactory
+ implements IClientPoolFactory<TEndPoint,
AsyncPipeDataTransferServiceClient> {
+
+ @Override
+ public KeyedObjectPool<TEndPoint, AsyncPipeDataTransferServiceClient>
createClientPool(
+ ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> manager) {
+ return new GenericKeyedObjectPool<>(
+ new AsyncPipeDataTransferServiceClient.Factory(
+ manager,
+ new ThriftClientProperty.Builder()
+ .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+ .build(),
+ ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
+ new ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
+ .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
+ .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
+ .build()
+ .getConfig());
+ }
+ }
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
new file mode 100644
index 00000000000..534349460f1
--- /dev/null
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client.async;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
+import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncClient
+ implements ThriftClient {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(AsyncPipeDataTransferServiceClient.class);
+
+ private final boolean printLogWhenEncounterException;
+
+ private final TEndPoint endpoint;
+ private final ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
clientManager;
+
+ private final AtomicBoolean shouldReturnSelf = new AtomicBoolean(true);
+
+ public AsyncPipeDataTransferServiceClient(
+ ThriftClientProperty property,
+ TEndPoint endpoint,
+ TAsyncClientManager tClientManager,
+ ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
clientManager)
+ throws IOException {
+ super(
+ property.getProtocolFactory(),
+ tClientManager,
+ TNonblockingSocketWrapper.wrap(
+ endpoint.getIp(), endpoint.getPort(),
property.getConnectionTimeoutMs()));
+ this.printLogWhenEncounterException =
property.isPrintLogWhenEncounterException();
+ this.endpoint = endpoint;
+ this.clientManager = clientManager;
+ }
+
+ @Override
+ public void onComplete() {
+ super.onComplete();
+ returnSelf();
+ }
+
+ @Override
+ public void onError(Exception e) {
+ super.onError(e);
+ ThriftClient.resolveException(e, this);
+ returnSelf();
+ }
+
+ @Override
+ public void invalidate() {
+ if (!hasError()) {
+ super.onError(new Exception("This client has been invalidated"));
+ }
+ }
+
+ @Override
+ public void invalidateAll() {
+ clientManager.clear(endpoint);
+ }
+
+ @Override
+ public boolean printLogWhenEncounterException() {
+ return printLogWhenEncounterException;
+ }
+
+ /**
+ * return self, the method doesn't need to be called by the user and will be
triggered after the
+ * RPC is finished.
+ */
+ public void returnSelf() {
+ if (shouldReturnSelf.get()) {
+ clientManager.returnClient(endpoint, this);
+ }
+ }
+
+ public void setShouldReturnSelf(boolean shouldReturnSelf) {
+ this.shouldReturnSelf.set(shouldReturnSelf);
+ }
+
+ private void close() {
+ ___transport.close();
+ ___currentMethod = null;
+ }
+
+ private boolean isReady() {
+ try {
+ checkReady();
+ return true;
+ } catch (Exception e) {
+ if (printLogWhenEncounterException) {
+ LOGGER.error("Unexpected exception occurs in {} : {}", this,
e.getMessage());
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("AsyncPipeDataTransferServiceClient{%s}", endpoint);
+ }
+
+ public static class Factory
+ extends AsyncThriftClientFactory<TEndPoint,
AsyncPipeDataTransferServiceClient> {
+
+ public Factory(
+ ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
clientManager,
+ ThriftClientProperty thriftClientProperty,
+ String threadName) {
+ super(clientManager, thriftClientProperty, threadName);
+ }
+
+ @Override
+ public void destroyObject(
+ TEndPoint endPoint, PooledObject<AsyncPipeDataTransferServiceClient>
pooledObject) {
+ pooledObject.getObject().close();
+ }
+
+ @Override
+ public PooledObject<AsyncPipeDataTransferServiceClient>
makeObject(TEndPoint endPoint)
+ throws Exception {
+ return new DefaultPooledObject<>(
+ new AsyncPipeDataTransferServiceClient(
+ thriftClientProperty,
+ endPoint,
+ tManagers[clientCnt.incrementAndGet() % tManagers.length],
+ clientManager));
+ }
+
+ @Override
+ public boolean validateObject(
+ TEndPoint endPoint, PooledObject<AsyncPipeDataTransferServiceClient>
pooledObject) {
+ return pooledObject.getObject().isReady();
+ }
+ }
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index 8bf376138e3..e8ef0018810 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -23,6 +23,8 @@ import
org.apache.iotdb.commons.pipe.plugin.builtin.collector.IoTDBCollector;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBSyncConnectorV1_1;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector;
+import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnectorV1;
+import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnectorV2;
import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
public enum BuiltinPipePlugin {
@@ -36,6 +38,8 @@ public enum BuiltinPipePlugin {
// connectors
DO_NOTHING_CONNECTOR("do_nothing_connector", DoNothingConnector.class),
IOTDB_THRIFT_CONNECTOR("iotdb_thrift_connector", IoTDBThriftConnector.class),
+ IOTDB_THRIFT_CONNECTOR_V1("iotdb_thrift_connector_v1",
IoTDBThriftConnectorV1.class),
+ IOTDB_THRIFT_CONNECTOR_V2("iotdb_thrift_connector_v2",
IoTDBThriftConnectorV2.class),
IOTDB_SYNC_CONNECTOR_V_1_1("iotdb_sync_connector_v1.1",
IoTDBSyncConnectorV1_1.class),
;
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
index 315e1347a6d..099bdd15557 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
@@ -36,43 +36,43 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
public class IoTDBThriftConnector implements PipeConnector {
@Override
- public void validate(PipeParameterValidator validator) {
+ public final void validate(PipeParameterValidator validator) {
throw new UnsupportedOperationException("This class is a placeholder and
should not be used.");
}
@Override
- public void customize(
+ public final void customize(
PipeParameters parameters, PipeConnectorRuntimeConfiguration
configuration) {
throw new UnsupportedOperationException("This class is a placeholder and
should not be used.");
}
@Override
- public void handshake() {
+ public final void handshake() {
throw new UnsupportedOperationException("This class is a placeholder and
should not be used.");
}
@Override
- public void heartbeat() {
+ public final void heartbeat() {
throw new UnsupportedOperationException("This class is a placeholder and
should not be used.");
}
@Override
- public void transfer(TabletInsertionEvent tabletInsertionEvent) {
+ public final void transfer(TabletInsertionEvent tabletInsertionEvent) {
throw new UnsupportedOperationException("This class is a placeholder and
should not be used.");
}
@Override
- public void transfer(TsFileInsertionEvent tsFileInsertionEvent) {
+ public final void transfer(TsFileInsertionEvent tsFileInsertionEvent) {
throw new UnsupportedOperationException("This class is a placeholder and
should not be used.");
}
@Override
- public void transfer(Event event) {
+ public final void transfer(Event event) {
throw new UnsupportedOperationException("This class is a placeholder and
should not be used.");
}
@Override
- public void close() {
+ public final void close() {
throw new UnsupportedOperationException("This class is a placeholder and
should not be used.");
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnectorV1.java
similarity index 74%
copy from
server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java
copy to
node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnectorV1.java
index 7dac858ae3f..e708767aac6 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnectorV1.java
@@ -17,19 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.connector;
+package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
-public enum IoTDBThriftConnectorVersion {
- VERSION_ONE((byte) 1),
- ;
-
- private final byte version;
-
- IoTDBThriftConnectorVersion(byte type) {
- this.version = type;
- }
-
- public byte getVersion() {
- return version;
- }
-}
+public class IoTDBThriftConnectorV1 extends IoTDBThriftConnector {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnectorV2.java
similarity index 74%
copy from
server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java
copy to
node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnectorV2.java
index 7dac858ae3f..c5aa3710ad2 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnectorV2.java
@@ -17,19 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.connector;
+package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
-public enum IoTDBThriftConnectorVersion {
- VERSION_ONE((byte) 1),
- ;
-
- private final byte version;
-
- IoTDBThriftConnectorVersion(byte type) {
- this.version = type;
- }
-
- public byte getVersion() {
- return version;
- }
-}
+public class IoTDBThriftConnectorV2 extends IoTDBThriftConnector {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
index 4230ee73a34..80b18288c3b 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
@@ -21,15 +21,15 @@ package org.apache.iotdb.db.pipe.agent.receiver;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
public interface IoTDBThriftReceiver {
- IoTDBThriftConnectorVersion getVersion();
+ IoTDBThriftConnectorRequestVersion getVersion();
- TPipeTransferResp handleTransferReq(
+ TPipeTransferResp receive(
TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher
schemaFetcher);
void handleExit();
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
index fdeda91ea56..663e169c0bf 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.agent.receiver;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion;
import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftReceiverV1;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -37,11 +37,11 @@ public class PipeReceiverAgent {
private final ThreadLocal<IoTDBThriftReceiver> receiverThreadLocal = new
ThreadLocal<>();
- public TPipeTransferResp transfer(
+ public TPipeTransferResp receive(
TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher
schemaFetcher) {
final byte reqVersion = req.getVersion();
- if (reqVersion == IoTDBThriftConnectorVersion.VERSION_ONE.getVersion()) {
- return getReceiver(reqVersion).handleTransferReq(req, partitionFetcher,
schemaFetcher);
+ if (reqVersion ==
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion()) {
+ return getReceiver(reqVersion).receive(req, partitionFetcher,
schemaFetcher);
} else {
return new TPipeTransferResp(
RpcUtils.getStatus(
@@ -71,7 +71,7 @@ public class PipeReceiverAgent {
}
private IoTDBThriftReceiver setAndGetReceiver(byte reqVersion) {
- if (reqVersion == IoTDBThriftConnectorVersion.VERSION_ONE.getVersion()) {
+ if (reqVersion ==
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion()) {
receiverThreadLocal.set(new IoTDBThriftReceiverV1());
} else {
throw new UnsupportedOperationException(
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 7c92206de81..d0b9002d2d4 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -25,9 +25,11 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_IOTDB_IP_KEY = "connector.ip";
public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port";
+ public static final String CONNECTOR_IOTDB_NODE_URLS_KEY =
"connector.node-urls";
public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user";
public static final String CONNECTOR_IOTDB_USER_DEFAULT_VALUE = "root";
+
public static final String CONNECTOR_IOTDB_PASSWORD_KEY =
"connector.password";
public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root";
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorRequestVersion.java
similarity index 87%
rename from
server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorRequestVersion.java
index 7dac858ae3f..670dd9eb6e0 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorRequestVersion.java
@@ -19,13 +19,14 @@
package org.apache.iotdb.db.pipe.connector;
-public enum IoTDBThriftConnectorVersion {
- VERSION_ONE((byte) 1),
+public enum IoTDBThriftConnectorRequestVersion {
+ VERSION_1((byte) 1),
+ VERSION_2((byte) 1),
;
private final byte version;
- IoTDBThriftConnectorVersion(byte type) {
+ IoTDBThriftConnectorRequestVersion(byte type) {
this.version = type;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorImplV1_1.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorV1_1.java
similarity index 97%
rename from
server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorImplV1_1.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorV1_1.java
index 3099cf008cc..53a45e83a87 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorImplV1_1.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorV1_1.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorClient;
+import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftConnectorClient;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
@@ -64,9 +64,9 @@ import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY;
-public class IoTDBSyncConnectorImplV1_1 implements PipeConnector {
+public class IoTDBSyncConnectorV1_1 implements PipeConnector {
- private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSyncConnectorImplV1_1.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSyncConnectorV1_1.class);
private static final CommonConfig COMMON_CONFIG =
CommonDescriptor.getInstance().getConfig();
public static final String IOTDB_SYNC_CONNECTOR_VERSION = "1.1";
@@ -163,7 +163,7 @@ public class IoTDBSyncConnectorImplV1_1 implements
PipeConnector {
"IoTDBSyncConnectorV1_1 only support PipeInsertNodeInsertionEvent
and PipeTabletInsertionEvent.");
}
} catch (TException e) {
- LOGGER.error(
+ LOGGER.warn(
"Network error when transfer tablet insertion event: {}.",
tabletInsertionEvent, e);
// the connection may be broken, try to reconnect by catching
PipeConnectionException
throw new PipeConnectionException(
@@ -193,7 +193,7 @@ public class IoTDBSyncConnectorImplV1_1 implements
PipeConnector {
try {
doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
} catch (TException e) {
- LOGGER.error(
+ LOGGER.warn(
"Network error when transfer tsFile insertion event: {}.",
tsFileInsertionEvent, e);
// The connection may be broken, try to reconnect by catching
PipeConnectionException
throw new PipeConnectionException("Network error when transfer tsFile
insertion event.", e);
@@ -244,7 +244,7 @@ public class IoTDBSyncConnectorImplV1_1 implements
PipeConnector {
}
}
} catch (TException e) {
- LOGGER.error(String.format("Cannot send pipe data to receiver %s:%s.",
ipAddress, port), e);
+ LOGGER.warn(String.format("Cannot send pipe data to receiver %s:%s.",
ipAddress, port), e);
throw new PipeConnectionException(e.getMessage(), e);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorClient.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorClient.java
similarity index 97%
rename from
server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorClient.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorClient.java
index db0047372db..c8f506fdded 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorClient.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorClient.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.connector;
+package org.apache.iotdb.db.pipe.connector.v1;
import org.apache.iotdb.commons.client.ThriftClient;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
index 3e1e0db312c..e863bad5dd9 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorClient;
import org.apache.iotdb.db.pipe.connector.v1.reponse.PipeTransferFilePieceResp;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFilePieceReq;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFileSealReq;
@@ -133,7 +132,7 @@ public class IoTDBThriftConnectorV1 implements
PipeConnector {
"IoTDBThriftConnectorV1 only support
PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent.");
}
} catch (TException e) {
- LOGGER.error(
+ LOGGER.warn(
"Network error when transfer tablet insertion event: {}.",
tabletInsertionEvent, e);
// the connection may be broken, try to reconnect by catching
PipeConnectionException
throw new PipeConnectionException(
@@ -184,7 +183,7 @@ public class IoTDBThriftConnectorV1 implements
PipeConnector {
try {
doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
} catch (TException e) {
- LOGGER.error(
+ LOGGER.warn(
"Network error when transfer tsfile insertion event: {}.",
tsFileInsertionEvent, e);
// the connection may be broken, try to reconnect by catching
PipeConnectionException
throw new PipeConnectionException(
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java
index dfd6765b68d..5ab2b19c5a7 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.pipe.agent.receiver.IoTDBThriftReceiver;
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion;
import org.apache.iotdb.db.pipe.connector.v1.reponse.PipeTransferFilePieceResp;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFilePieceReq;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFileSealReq;
@@ -61,7 +61,7 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
private RandomAccessFile writingFileWriter;
@Override
- public synchronized TPipeTransferResp handleTransferReq(
+ public synchronized TPipeTransferResp receive(
TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher
schemaFetcher) {
final short rawRequestType = req.getType();
if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
@@ -297,7 +297,7 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
}
@Override
- public IoTDBThriftConnectorVersion getVersion() {
- return IoTDBThriftConnectorVersion.VERSION_ONE;
+ public IoTDBThriftConnectorRequestVersion getVersion() {
+ return IoTDBThriftConnectorRequestVersion.VERSION_1;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFilePieceReq.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFilePieceReq.java
index 897ee2a29e7..d26939ec54f 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFilePieceReq.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFilePieceReq.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.connector.v1.request;
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion;
import org.apache.iotdb.db.pipe.connector.v1.PipeRequestType;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -58,7 +58,7 @@ public class PipeTransferFilePieceReq extends
TPipeTransferReq {
filePieceReq.startWritingOffset = startWritingOffset;
filePieceReq.filePiece = filePiece;
- filePieceReq.version =
IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+ filePieceReq.version =
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion();
filePieceReq.type = PipeRequestType.TRANSFER_FILE_PIECE.getType();
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFileSealReq.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFileSealReq.java
index 0e73b95af07..1314e5e5ae0 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFileSealReq.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFileSealReq.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.connector.v1.request;
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion;
import org.apache.iotdb.db.pipe.connector.v1.PipeRequestType;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -51,7 +51,7 @@ public class PipeTransferFileSealReq extends TPipeTransferReq
{
fileSealReq.fileName = fileName;
fileSealReq.fileLength = fileLength;
- fileSealReq.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+ fileSealReq.version =
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion();
fileSealReq.type = PipeRequestType.TRANSFER_FILE_SEAL.getType();
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferHandshakeReq.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferHandshakeReq.java
index da03903fd79..aceb9a5877b 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferHandshakeReq.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferHandshakeReq.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.connector.v1.request;
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion;
import org.apache.iotdb.db.pipe.connector.v1.PipeRequestType;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -45,7 +45,7 @@ public class PipeTransferHandshakeReq extends
TPipeTransferReq {
handshakeReq.timestampPrecision = timestampPrecision;
- handshakeReq.version =
IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+ handshakeReq.version =
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion();
handshakeReq.type = PipeRequestType.HANDSHAKE.getType();
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferInsertNodeReq.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferInsertNodeReq.java
index 10098a9721e..157a4200fd6 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferInsertNodeReq.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferInsertNodeReq.java
@@ -26,7 +26,7 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion;
import org.apache.iotdb.db.pipe.connector.v1.PipeRequestType;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -83,7 +83,7 @@ public class PipeTransferInsertNodeReq extends
TPipeTransferReq {
req.insertNode = insertNode;
- req.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+ req.version = IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion();
req.type = PipeRequestType.TRANSFER_INSERT_NODE.getType();
req.body = insertNode.serializeToByteBuffer();
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java
index 758013d6c72..ead686a6acf 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion;
import org.apache.iotdb.db.pipe.connector.v1.PipeRequestType;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
@@ -47,12 +47,12 @@ public class PipeTransferTabletReq extends TPipeTransferReq
{
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTransferTabletReq.class);
private Tablet tablet;
- public static TPipeTransferReq toTPipeTransferReq(Tablet tablet) throws
IOException {
+ public static PipeTransferTabletReq toTPipeTransferReq(Tablet tablet) throws
IOException {
final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq();
tabletReq.tablet = tablet;
- tabletReq.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+ tabletReq.version =
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion();
tabletReq.type = PipeRequestType.TRANSFER_TABLET.getType();
tabletReq.body = tablet.serialize();
return tabletReq;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
new file mode 100644
index 00000000000..e22d2620cc9
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.v2;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftConnectorClient;
+import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferHandshakeReq;
+import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferInsertNodeReq;
+import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferTabletReq;
+import
org.apache.iotdb.db.pipe.connector.v2.handler.PipeTransferInsertNodeTabletInsertionEventHandler;
+import
org.apache.iotdb.db.pipe.connector.v2.handler.PipeTransferRawTabletInsertionEventHandler;
+import
org.apache.iotdb.db.pipe.connector.v2.handler.PipeTransferTsFileInsertionEventHandler;
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.PipeConnector;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+import org.apache.iotdb.session.util.SessionUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
+
+public class IoTDBThriftConnectorV2 implements PipeConnector {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBThriftConnectorV2.class);
+
+ private static final CommonConfig COMMON_CONFIG =
CommonDescriptor.getInstance().getConfig();
+ private static final IoTDBConfig IOTDB_CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+
+ private static final IClientManager<TEndPoint,
AsyncPipeDataTransferServiceClient>
+ ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER =
+ new IClientManager.Factory<TEndPoint,
AsyncPipeDataTransferServiceClient>()
+ .createClientManager(
+ new
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory());
+
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+ private final AtomicLong commitIdGenerator = new AtomicLong(0);
+ private final AtomicLong lastCommitId = new AtomicLong(0);
+ private final PriorityQueue<Pair<Long, Runnable>> commitQueue =
+ new PriorityQueue<>(Comparator.comparing(o -> o.left));
+
+ private List<TEndPoint> nodeUrls;
+
+ public synchronized void commit(long requestCommitId, @Nullable
EnrichedEvent enrichedEvent) {
+ commitQueue.offer(
+ new Pair<>(
+ requestCommitId,
+ () ->
+ Optional.ofNullable(enrichedEvent)
+ .ifPresent(
+ event ->
+
event.decreaseReferenceCount(IoTDBThriftConnectorV2.class.getName()))));
+
+ while (!commitQueue.isEmpty()) {
+ final Pair<Long, Runnable> committer = commitQueue.peek();
+ if (lastCommitId.get() + 1 != committer.left) {
+ break;
+ }
+
+ committer.right.run();
+ lastCommitId.incrementAndGet();
+
+ commitQueue.poll();
+ }
+ }
+
+ @Override
+ public void validate(PipeParameterValidator validator) throws Exception {
+ // node urls string should be like "localhost:6667,localhost:6668"
+ validator.validateRequiredAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY);
+ }
+
+ @Override
+ public void customize(PipeParameters parameters,
PipeConnectorRuntimeConfiguration configuration)
+ throws Exception {
+ nodeUrls =
+ SessionUtils.parseSeedNodeUrls(
+
Arrays.asList(parameters.getString(CONNECTOR_IOTDB_NODE_URLS_KEY).split(",")));
+ if (nodeUrls.isEmpty()) {
+ throw new PipeException("Node urls is empty.");
+ }
+ }
+
+ @Override
+ public void handshake() throws Exception {
+ final TEndPoint firstNodeUrl = nodeUrls.get(0);
+ try (IoTDBThriftConnectorClient client =
+ new IoTDBThriftConnectorClient(
+ new ThriftClientProperty.Builder()
+
.setConnectionTimeoutMs(COMMON_CONFIG.getConnectionTimeoutInMS())
+
.setRpcThriftCompressionEnabled(COMMON_CONFIG.isRpcThriftCompressionEnabled())
+ .build(),
+ firstNodeUrl.getIp(),
+ firstNodeUrl.getPort())) {
+ final TPipeTransferResp resp =
+ client.pipeTransfer(
+
PipeTransferHandshakeReq.toTPipeTransferReq(IOTDB_CONFIG.getTimestampPrecision()));
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(String.format("Handshake error, result status
%s.", resp.status));
+ }
+ } catch (TException e) {
+ LOGGER.warn(
+ String.format(
+ "Connect to receiver %s:%s error.", firstNodeUrl.getIp(),
firstNodeUrl.getPort()),
+ e);
+ throw new PipeConnectionException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void heartbeat() {
+ // do nothing
+ }
+
+ @Override
+ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws
Exception {
+ if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ final long requestCommitId = commitIdGenerator.incrementAndGet();
+ final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent =
+ (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
+ final PipeTransferInsertNodeReq pipeTransferInsertNodeReq =
+ PipeTransferInsertNodeReq.toTPipeTransferReq(
+ pipeInsertNodeTabletInsertionEvent.getInsertNode());
+ final PipeTransferInsertNodeTabletInsertionEventHandler
pipeTransferInsertNodeReqHandler =
+ new PipeTransferInsertNodeTabletInsertionEventHandler(
+ requestCommitId, pipeInsertNodeTabletInsertionEvent,
pipeTransferInsertNodeReq, this);
+
+ transfer(requestCommitId, pipeTransferInsertNodeReqHandler);
+ } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
+ final long requestCommitId = commitIdGenerator.incrementAndGet();
+ final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
+ (PipeRawTabletInsertionEvent) tabletInsertionEvent;
+ final PipeTransferTabletReq pipeTransferTabletReq =
+
PipeTransferTabletReq.toTPipeTransferReq(pipeRawTabletInsertionEvent.convertToTablet());
+ final PipeTransferRawTabletInsertionEventHandler
pipeTransferTabletReqHandler =
+ new PipeTransferRawTabletInsertionEventHandler(
+ requestCommitId, pipeTransferTabletReq, this);
+
+ transfer(requestCommitId, pipeTransferTabletReqHandler);
+ } else {
+ throw new NotImplementedException(
+ "IoTDBThriftConnectorV2 only support
PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent.");
+ }
+ }
+
+ public void transfer(
+ long requestCommitId,
+ PipeTransferInsertNodeTabletInsertionEventHandler
pipeTransferInsertNodeReqHandler) {
+ final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId %
nodeUrls.size()));
+
+ try {
+ final AsyncPipeDataTransferServiceClient client =
+ ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER.borrowClient(targetNodeUrl);
+
+ try {
+ pipeTransferInsertNodeReqHandler.transfer(client);
+ } catch (TException e) {
+ LOGGER.warn(
+ String.format(
+ "Transfer insert node to receiver %s:%s error, retrying...",
+ targetNodeUrl.getIp(), targetNodeUrl.getPort()),
+ e);
+ }
+ } catch (Exception ex) {
+ pipeTransferInsertNodeReqHandler.onError(ex);
+ LOGGER.warn(
+ String.format(
+ "Failed to borrow client from client pool for receiver %s:%s.",
+ targetNodeUrl.getIp(), targetNodeUrl.getPort()),
+ ex);
+ }
+ }
+
+ public void transfer(
+ long requestCommitId,
+ PipeTransferRawTabletInsertionEventHandler pipeTransferTabletReqHandler)
{
+ final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId %
nodeUrls.size()));
+
+ try {
+ final AsyncPipeDataTransferServiceClient client =
+ ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER.borrowClient(targetNodeUrl);
+
+ try {
+ pipeTransferTabletReqHandler.transfer(client);
+ } catch (TException e) {
+ LOGGER.warn(
+ String.format(
+ "Transfer tablet to receiver %s:%s error, retrying...",
+ targetNodeUrl.getIp(), targetNodeUrl.getPort()),
+ e);
+ }
+ } catch (Exception ex) {
+ pipeTransferTabletReqHandler.onError(ex);
+ LOGGER.warn(
+ String.format(
+ "Failed to borrow client from client pool for receiver %s:%s.",
+ targetNodeUrl.getIp(), targetNodeUrl.getPort()),
+ ex);
+ }
+ }
+
+ @Override
+ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws
Exception {
+ if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
+ throw new NotImplementedException(
+ "IoTDBThriftConnectorV2 only support PipeTsFileInsertionEvent.");
+ }
+
+ final long requestCommitId = commitIdGenerator.incrementAndGet();
+ final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
+ (PipeTsFileInsertionEvent) tsFileInsertionEvent;
+ final PipeTransferTsFileInsertionEventHandler
pipeTransferTsFileInsertionEventHandler =
+ new PipeTransferTsFileInsertionEventHandler(
+ requestCommitId, pipeTsFileInsertionEvent, this);
+
+ pipeTsFileInsertionEvent.waitForTsFileClose();
+ transfer(requestCommitId, pipeTransferTsFileInsertionEventHandler);
+ }
+
+ public void transfer(
+ long requestCommitId,
+ PipeTransferTsFileInsertionEventHandler
pipeTransferTsFileInsertionEventHandler) {
+ final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId %
nodeUrls.size()));
+
+ try {
+ final AsyncPipeDataTransferServiceClient client =
+ ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER.borrowClient(targetNodeUrl);
+
+ try {
+ pipeTransferTsFileInsertionEventHandler.transfer(client);
+ } catch (TException e) {
+ LOGGER.warn(
+ String.format(
+ "Transfer tsfile to receiver %s:%s error, retrying...",
+ targetNodeUrl.getIp(), targetNodeUrl.getPort()),
+ e);
+ }
+ } catch (Exception ex) {
+ pipeTransferTsFileInsertionEventHandler.onError(ex);
+ LOGGER.warn(
+ String.format(
+ "Failed to borrow client from client pool for receiver %s:%s.",
+ targetNodeUrl.getIp(), targetNodeUrl.getPort()),
+ ex);
+ }
+ }
+
+ @Override
+ public void transfer(Event event) {
+ LOGGER.warn("IoTDBThriftConnectorV2 does not support transfer generic
event: {}.", event);
+ }
+
+ @Override
+ public void close() {
+ isClosed.set(true);
+ }
+
+ public boolean isClosed() {
+ return isClosed.get();
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java
new file mode 100644
index 00000000000..c8d95c8691e
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.v2.handler;
+
+import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2;
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.thrift.TException;
+import org.jetbrains.annotations.Nullable;
+
+public class PipeTransferInsertNodeTabletInsertionEventHandler
+ extends PipeTransferTabletInsertionEventHandler<TPipeTransferResp> {
+ public PipeTransferInsertNodeTabletInsertionEventHandler(
+ long requestCommitId,
+ @Nullable EnrichedEvent event,
+ TPipeTransferReq req,
+ IoTDBThriftConnectorV2 connector) {
+ super(requestCommitId, event, req, connector);
+ }
+
+ @Override
+ protected void doTransfer(AsyncPipeDataTransferServiceClient client,
TPipeTransferReq req)
+ throws TException {
+ client.pipeTransfer(req, this);
+ }
+
+ @Override
+ protected void retryTransfer(IoTDBThriftConnectorV2 connector, long
requestCommitId) {
+ connector.transfer(requestCommitId, this);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferRawTabletInsertionEventHandler.java
similarity index 51%
copy from
server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
copy to
server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferRawTabletInsertionEventHandler.java
index 4230ee73a34..d0096eaafc2 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferRawTabletInsertionEventHandler.java
@@ -17,20 +17,31 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent.receiver;
+package org.apache.iotdb.db.pipe.connector.v2.handler;
-import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion;
+import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
-public interface IoTDBThriftReceiver {
+import org.apache.thrift.TException;
- IoTDBThriftConnectorVersion getVersion();
+public class PipeTransferRawTabletInsertionEventHandler
+ extends PipeTransferTabletInsertionEventHandler<TPipeTransferResp> {
- TPipeTransferResp handleTransferReq(
- TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher
schemaFetcher);
+ public PipeTransferRawTabletInsertionEventHandler(
+ long requestCommitId, TPipeTransferReq req, IoTDBThriftConnectorV2
connector) {
+ super(requestCommitId, null, req, connector);
+ }
- void handleExit();
+ @Override
+ protected void doTransfer(AsyncPipeDataTransferServiceClient client,
TPipeTransferReq req)
+ throws TException {
+ client.pipeTransfer(req, this);
+ }
+
+ @Override
+ protected void retryTransfer(IoTDBThriftConnectorV2 connector, long
requestCommitId) {
+ connector.transfer(requestCommitId, this);
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java
new file mode 100644
index 00000000000..6b150c53357
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.v2.handler;
+
+import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2;
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+public abstract class PipeTransferTabletInsertionEventHandler<E extends
TPipeTransferResp>
+ implements AsyncMethodCallback<E> {
+
+ private static final Logger LOGGER =
+
LoggerFactory.getLogger(PipeTransferInsertNodeTabletInsertionEventHandler.class);
+
+ private final long requestCommitId;
+ private final EnrichedEvent event;
+ private final TPipeTransferReq req;
+
+ private final IoTDBThriftConnectorV2 connector;
+
+ private static final long MAX_RETRY_WAIT_TIME_MS =
+ (long) (PipeConfig.getInstance().getPipeConnectorRetryIntervalMs() *
Math.pow(2, 5));
+ private int retryCount = 0;
+
+ public PipeTransferTabletInsertionEventHandler(
+ long requestCommitId,
+ @Nullable EnrichedEvent event,
+ TPipeTransferReq req,
+ IoTDBThriftConnectorV2 connector) {
+ this.requestCommitId = requestCommitId;
+ this.event = event;
+ this.req = req;
+ this.connector = connector;
+
+ Optional.ofNullable(event)
+ .ifPresent(
+ e ->
e.increaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName()));
+ }
+
+ public void transfer(AsyncPipeDataTransferServiceClient client) throws
TException {
+ doTransfer(client, req);
+ }
+
+ protected abstract void doTransfer(
+ AsyncPipeDataTransferServiceClient client, TPipeTransferReq req) throws
TException;
+
+ @Override
+ public void onComplete(TPipeTransferResp response) {
+ // just in case
+ if (response == null) {
+ onError(new PipeException("TPipeTransferResp is null"));
+ return;
+ }
+
+ if (response.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ connector.commit(requestCommitId, event);
+ } else {
+ onError(new PipeException(response.getStatus().getMessage()));
+ }
+ }
+
+ @Override
+ public void onError(Exception exception) {
+ ++retryCount;
+
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ Thread.sleep(
+ Math.min(
+ (long)
+
(PipeConfig.getInstance().getPipeConnectorRetryIntervalMs()
+ * Math.pow(2d, retryCount - 1d)),
+ MAX_RETRY_WAIT_TIME_MS));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Unexpected interruption during retrying", e);
+ }
+
+ if (connector.isClosed()) {
+ LOGGER.info(
+ "IoTDBThriftConnectorV2 has been stopped, we will not retry
this request {} after {} times",
+ req,
+ retryCount,
+ exception);
+ } else {
+ LOGGER.warn(
+ "IoTDBThriftConnectorV2 failed to transfer request {} after {}
times, retrying...",
+ req,
+ retryCount,
+ exception);
+
+ retryTransfer(connector, requestCommitId);
+ }
+ });
+ }
+
+ protected abstract void retryTransfer(IoTDBThriftConnectorV2 connector, long
requestCommitId);
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java
new file mode 100644
index 00000000000..21c244f0537
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.v2.handler;
+
+import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.connector.v1.reponse.PipeTransferFilePieceResp;
+import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFilePieceReq;
+import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFileSealReq;
+import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class PipeTransferTsFileInsertionEventHandler
+ implements AsyncMethodCallback<TPipeTransferResp> {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(PipeTransferTsFileInsertionEventHandler.class);
+
+ private final long requestCommitId;
+ private final PipeTsFileInsertionEvent event;
+ private final IoTDBThriftConnectorV2 connector;
+
+ private final File tsFile;
+ private final int readFileBufferSize;
+ private final byte[] readBuffer;
+ private long position;
+
+ private RandomAccessFile reader;
+
+ private final AtomicBoolean isSealSignalSent;
+
+ private AsyncPipeDataTransferServiceClient client;
+
+ private static final long MAX_RETRY_WAIT_TIME_MS =
+ (long) (PipeConfig.getInstance().getPipeConnectorRetryIntervalMs() *
Math.pow(2, 5));
+ private int retryCount = 0;
+
+ public PipeTransferTsFileInsertionEventHandler(
+ long requestCommitId, PipeTsFileInsertionEvent event,
IoTDBThriftConnectorV2 connector)
+ throws FileNotFoundException {
+ this.requestCommitId = requestCommitId;
+ this.event = event;
+ this.connector = connector;
+
+ tsFile = event.getTsFile();
+ readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+ readBuffer = new byte[readFileBufferSize];
+ position = 0;
+
+ reader = new RandomAccessFile(tsFile, "r");
+
+ isSealSignalSent = new AtomicBoolean(false);
+
+
event.increaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName());
+ }
+
+ public void transfer(AsyncPipeDataTransferServiceClient client) throws
TException, IOException {
+ this.client = client;
+ client.setShouldReturnSelf(false);
+
+ final int readLength = reader.read(readBuffer);
+
+ if (readLength == -1) {
+ isSealSignalSent.set(true);
+ client.pipeTransfer(
+ PipeTransferFileSealReq.toTPipeTransferReq(tsFile.getName(),
tsFile.length()), this);
+ return;
+ }
+
+ client.pipeTransfer(
+ PipeTransferFilePieceReq.toTPipeTransferReq(
+ tsFile.getName(),
+ position,
+ readLength == readFileBufferSize
+ ? readBuffer
+ : Arrays.copyOfRange(readBuffer, 0, readLength)),
+ this);
+ position += readLength;
+ }
+
+ @Override
+ public void onComplete(TPipeTransferResp response) {
+ if (isSealSignalSent.get()) {
+ if (response.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ onError(
+ new PipeException(
+ String.format(
+ "Seal file %s error, result status %s.", tsFile,
response.getStatus())));
+ return;
+ }
+
+ try {
+ if (reader != null) {
+ reader.close();
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Failed to close file reader.", e);
+ } finally {
+ if (client != null) {
+ client.setShouldReturnSelf(true);
+ client.returnSelf();
+ }
+
+ connector.commit(requestCommitId, event);
+ }
+ return;
+ }
+
+ // if the isSealSignalSent is false, then the response must be a
PipeTransferFilePieceResp
+ try {
+ final PipeTransferFilePieceResp resp =
+ PipeTransferFilePieceResp.fromTPipeTransferResp(response);
+
+ // this case only happens when the connection is broken, and the
connector is reconnected
+ // to the receiver, then the receiver will redirect the file position to
the last position
+ final long code = resp.getStatus().getCode();
+
+ if (code ==
TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
+ position = resp.getEndWritingOffset();
+ reader.seek(position);
+ LOGGER.info(String.format("Redirect file position to %s.", position));
+ } else if (code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(
+ String.format("Transfer file %s error, result status %s.", tsFile,
resp.getStatus()));
+ }
+
+ transfer(client);
+ } catch (Exception e) {
+ onError(e);
+ }
+ }
+
+ @Override
+ public void onError(Exception exception) {
+ try {
+ if (reader != null) {
+ reader.close();
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Failed to close file reader.", e);
+ } finally {
+ if (client != null) {
+ client.setShouldReturnSelf(true);
+ client.returnSelf();
+ }
+ }
+
+ ++retryCount;
+
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ Thread.sleep(
+ Math.min(
+ (long)
+
(PipeConfig.getInstance().getPipeConnectorRetryIntervalMs()
+ * Math.pow(2d, retryCount - 1d)),
+ MAX_RETRY_WAIT_TIME_MS));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Unexpected interruption during retrying", e);
+ }
+
+ if (connector.isClosed()) {
+ LOGGER.info(
+ "IoTDBThriftConnectorV2 has been stopped, we will not retry to
transfer tsfile {}.",
+ tsFile,
+ exception);
+ } else {
+ LOGGER.warn(
+ "IoTDBThriftConnectorV2 failed to transfer tsfile {} after {}
times, retrying...",
+ tsFile,
+ retryCount,
+ exception);
+
+ try {
+ position = 0;
+ reader = new RandomAccessFile(tsFile, "r");
+ isSealSignalSent.set(false);
+
+ connector.transfer(requestCommitId, this);
+ } catch (FileNotFoundException e) {
+ LOGGER.error("Exception occurred when retrying...", e);
+ onError(e);
+ }
+ }
+ });
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index 27975236127..f2df19bf2b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -57,7 +57,7 @@ public abstract class EnrichedEvent implements Event {
boolean isSuccessful = true;
synchronized (this) {
if (referenceCount.get() == 0) {
- isSuccessful = increaseResourceReferenceCount(holderMessage);
+ isSuccessful = internallyIncreaseResourceReferenceCount(holderMessage);
}
referenceCount.incrementAndGet();
}
@@ -71,7 +71,7 @@ public abstract class EnrichedEvent implements Event {
* @return true if the reference count is increased successfully, false if
the event is not
* controlled by the invoker, which means the data stored in the event
is not safe to use
*/
- public abstract boolean increaseResourceReferenceCount(String holderMessage);
+ public abstract boolean internallyIncreaseResourceReferenceCount(String
holderMessage);
/**
* Decrease the reference count of this event. If the reference count is
decreased to 0, the event
@@ -85,7 +85,7 @@ public abstract class EnrichedEvent implements Event {
boolean isSuccessful = true;
synchronized (this) {
if (referenceCount.get() == 1) {
- isSuccessful = decreaseResourceReferenceCount(holderMessage);
+ isSuccessful = internallyDecreaseResourceReferenceCount(holderMessage);
reportProgress();
}
referenceCount.decrementAndGet();
@@ -100,7 +100,7 @@ public abstract class EnrichedEvent implements Event {
* @param holderMessage the message of the invoker
* @return true if the reference count is decreased successfully, false
otherwise
*/
- public abstract boolean decreaseResourceReferenceCount(String holderMessage);
+ public abstract boolean internallyDecreaseResourceReferenceCount(String
holderMessage);
private void reportProgress() {
if (pipeTaskMeta != null) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 8a3f9ca8a77..5623d7715a6 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -70,7 +70,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
/////////////////////////// EnrichedEvent ///////////////////////////
@Override
- public boolean increaseResourceReferenceCount(String holderMessage) {
+ public boolean internallyIncreaseResourceReferenceCount(String
holderMessage) {
try {
PipeResourceManager.wal().pin(walEntryHandler.getMemTableId(),
walEntryHandler);
return true;
@@ -85,7 +85,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
}
@Override
- public boolean decreaseResourceReferenceCount(String holderMessage) {
+ public boolean internallyDecreaseResourceReferenceCount(String
holderMessage) {
try {
PipeResourceManager.wal().unpin(walEntryHandler.getMemTableId());
return true;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index d82313dad8d..40aaf53c1ec 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -107,7 +107,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
/////////////////////////// EnrichedEvent ///////////////////////////
@Override
- public boolean increaseResourceReferenceCount(String holderMessage) {
+ public boolean internallyIncreaseResourceReferenceCount(String
holderMessage) {
try {
tsFile = PipeResourceManager.file().increaseFileReference(tsFile, true);
return true;
@@ -122,7 +122,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
}
@Override
- public boolean decreaseResourceReferenceCount(String holderMessage) {
+ public boolean internallyDecreaseResourceReferenceCount(String
holderMessage) {
try {
PipeResourceManager.file().decreaseFileReference(tsFile);
return true;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEvent.java
index 06410b26a02..cb84d0bc11a 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEvent.java
@@ -95,8 +95,8 @@ public class PipeRealtimeCollectEvent extends EnrichedEvent {
}
@Override
- public boolean increaseResourceReferenceCount(String holderMessage) {
- return event.increaseResourceReferenceCount(holderMessage);
+ public boolean internallyIncreaseResourceReferenceCount(String
holderMessage) {
+ return event.internallyIncreaseResourceReferenceCount(holderMessage);
}
@Override
@@ -109,8 +109,8 @@ public class PipeRealtimeCollectEvent extends EnrichedEvent
{
}
@Override
- public boolean decreaseResourceReferenceCount(String holderMessage) {
- return event.decreaseResourceReferenceCount(holderMessage);
+ public boolean internallyDecreaseResourceReferenceCount(String
holderMessage) {
+ return event.internallyDecreaseResourceReferenceCount(holderMessage);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
index fadb6a0184f..829e0bb993b 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
@@ -24,8 +24,9 @@ import
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant;
import
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
-import org.apache.iotdb.db.pipe.connector.lagacy.IoTDBSyncConnectorImplV1_1;
+import org.apache.iotdb.db.pipe.connector.lagacy.IoTDBSyncConnectorV1_1;
import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftConnectorV1;
+import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2;
import
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -61,11 +62,15 @@ public class PipeConnectorSubtaskManager {
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName());
PipeConnector pipeConnector;
- if
(connectorKey.equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName()))
{
+ if
(connectorKey.equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
+ ||
connectorKey.equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR_V1.getPipePluginName()))
{
pipeConnector = new IoTDBThriftConnectorV1();
+ } else if (connectorKey.equals(
+ BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR_V2.getPipePluginName())) {
+ pipeConnector = new IoTDBThriftConnectorV2();
} else if (connectorKey.equals(
BuiltinPipePlugin.IOTDB_SYNC_CONNECTOR_V_1_1.getPipePluginName())) {
- pipeConnector = new IoTDBSyncConnectorImplV1_1();
+ pipeConnector = new IoTDBSyncConnectorV1_1();
} else {
pipeConnector =
PipeAgent.plugin().reflectConnector(pipeConnectorParameters);
}
@@ -126,15 +131,6 @@ public class PipeConnectorSubtaskManager {
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).stop();
}
- public PipeConnectorSubtask getPipeConnectorSubtask(String
attributeSortedString) {
- if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
- throw new PipeException(
- "Failed to get PipeConnectorSubtask. No such subtask: " +
attributeSortedString);
- }
-
- return
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getSubtask();
- }
-
public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 28bec0979bd..4fbe17270f6 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -2111,7 +2111,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
@Override
public TPipeTransferResp pipeTransfer(TPipeTransferReq req) {
- return PipeAgent.receiver().transfer(req, partitionFetcher, schemaFetcher);
+ return PipeAgent.receiver().receive(req, partitionFetcher, schemaFetcher);
}
@Override