This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch IOTDB-6001-1.2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 39c304b6d3404f658d2ec0ee5cfca2cc261d3e1c Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Jun 16 10:23:54 2023 +0800 [IOTDB-6001] Pipe: a non-blocking iotdb connector (iotdb_thrift_connector_v2) (#10174) (cherry picked from commit e161f2528b9208868cee692a58b34596756a82dd) --- .../runtime/PipeHandleMetaChangeProcedure.java | 2 +- .../apache/iotdb/pipe/api/access/RowIterator.java | 75 ----- .../iotdb/commons/client/ClientPoolFactory.java | 24 ++ .../async/AsyncPipeDataTransferServiceClient.java | 167 +++++++++++ .../pipe/plugin/builtin/BuiltinPipePlugin.java | 4 + .../builtin/connector/IoTDBThriftConnector.java | 16 +- .../builtin/connector/IoTDBThriftConnectorV1.java | 17 +- .../builtin/connector/IoTDBThriftConnectorV2.java | 17 +- .../pipe/agent/receiver/IoTDBThriftReceiver.java | 6 +- .../db/pipe/agent/receiver/PipeReceiverAgent.java | 10 +- .../config/constant/PipeConnectorConstant.java | 2 + ...ava => IoTDBThriftConnectorRequestVersion.java} | 7 +- ...orImplV1_1.java => IoTDBSyncConnectorV1_1.java} | 12 +- .../{ => v1}/IoTDBThriftConnectorClient.java | 2 +- .../pipe/connector/v1/IoTDBThriftConnectorV1.java | 5 +- .../pipe/connector/v1/IoTDBThriftReceiverV1.java | 8 +- .../v1/request/PipeTransferFilePieceReq.java | 4 +- .../v1/request/PipeTransferFileSealReq.java | 4 +- .../v1/request/PipeTransferHandshakeReq.java | 4 +- .../v1/request/PipeTransferInsertNodeReq.java | 4 +- .../v1/request/PipeTransferTabletReq.java | 6 +- .../pipe/connector/v2/IoTDBThriftConnectorV2.java | 312 +++++++++++++++++++++ ...nsferInsertNodeTabletInsertionEventHandler.java | 51 ++++ ...ipeTransferRawTabletInsertionEventHandler.java} | 29 +- .../PipeTransferTabletInsertionEventHandler.java | 131 +++++++++ .../PipeTransferTsFileInsertionEventHandler.java | 222 +++++++++++++++ .../apache/iotdb/db/pipe/event/EnrichedEvent.java | 8 +- .../tablet/PipeInsertNodeTabletInsertionEvent.java | 4 +- .../common/tsfile/PipeTsFileInsertionEvent.java | 4 +- .../event/realtime/PipeRealtimeCollectEvent.java | 8 +- .../task/subtask/PipeConnectorSubtaskManager.java | 20 +- .../service/thrift/impl/ClientRPCServiceImpl.java | 2 +- 32 files changed, 1003 insertions(+), 184 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java index 31ebb405907..435b3e0b41f 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java @@ -106,7 +106,7 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV final PipeMeta pipeMetaFromDataNode = pipeMetaMapFromDataNode.get(pipeMetaOnConfigNode.getStaticMeta()); if (pipeMetaFromDataNode == null) { - LOGGER.warn( + LOGGER.info( "PipeRuntimeCoordinator meets error in updating pipeMetaKeeper, " + "pipeMetaFromDataNode is null, pipeMetaOnConfigNode: {}", pipeMetaOnConfigNode); diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/RowIterator.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/RowIterator.java deleted file mode 100644 index ae9849eb3cc..00000000000 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/RowIterator.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.pipe.api.access; - -import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; -import org.apache.iotdb.pipe.api.type.Type; -import org.apache.iotdb.tsfile.read.common.Path; - -import java.io.IOException; -import java.util.List; - -public interface RowIterator { - - /** - * Returns {@code true} if the iteration has more rows. - * - * @return {@code true} if the iteration has more rows - */ - boolean hasNextRow(); - - /** - * Returns the next row in the iteration. - * - * <p>Note that the Row instance returned by this method each time is the same instance. In other - * words, calling {@code next()} will only change the member variables inside the Row instance, - * but will not generate a new Row instance. - * - * @return the next element in the iteration - * @throws IOException if any I/O errors occur - */ - Row next() throws IOException; - - /** Resets the iteration. */ - void reset(); - - /** - * Returns the actual column index of the given column name. - * - * @param columnName the column name in Path form - * @throws PipeParameterNotValidException if the given column name is not existed - * @return the actual column index of the given column name - */ - int getColumnIndex(Path columnName) throws PipeParameterNotValidException; - - /** - * Returns the column names - * - * @return the column names - */ - List<Path> getColumnNames(); - - /** - * Returns the column data types - * - * @return the column data types - */ - List<Type> getColumnTypes(); -} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java index b156ec47851..972ffeea774 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.async.AsyncConfigNodeIServiceClient; import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.client.async.AsyncDataNodeMPPDataExchangeServiceClient; +import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; import org.apache.iotdb.commons.client.property.ClientPoolProperty; import org.apache.iotdb.commons.client.property.ThriftClientProperty; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; @@ -220,4 +221,27 @@ public class ClientPoolFactory { .getConfig()); } } + + public static class AsyncPipeDataTransferServiceClientPoolFactory + implements IClientPoolFactory<TEndPoint, AsyncPipeDataTransferServiceClient> { + + @Override + public KeyedObjectPool<TEndPoint, AsyncPipeDataTransferServiceClient> createClientPool( + ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> manager) { + return new GenericKeyedObjectPool<>( + new AsyncPipeDataTransferServiceClient.Factory( + manager, + new ThriftClientProperty.Builder() + .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS()) + .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) + .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .build(), + ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()), + new ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>() + .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode()) + .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode()) + .build() + .getConfig()); + } + } } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java new file mode 100644 index 00000000000..534349460f1 --- /dev/null +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.client.async; + +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.client.ClientManager; +import org.apache.iotdb.commons.client.ThriftClient; +import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory; +import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.rpc.TNonblockingSocketWrapper; +import org.apache.iotdb.service.rpc.thrift.IClientRPCService; + +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.thrift.async.TAsyncClientManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +public class AsyncPipeDataTransferServiceClient extends IClientRPCService.AsyncClient + implements ThriftClient { + + private static final Logger LOGGER = + LoggerFactory.getLogger(AsyncPipeDataTransferServiceClient.class); + + private final boolean printLogWhenEncounterException; + + private final TEndPoint endpoint; + private final ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> clientManager; + + private final AtomicBoolean shouldReturnSelf = new AtomicBoolean(true); + + public AsyncPipeDataTransferServiceClient( + ThriftClientProperty property, + TEndPoint endpoint, + TAsyncClientManager tClientManager, + ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> clientManager) + throws IOException { + super( + property.getProtocolFactory(), + tClientManager, + TNonblockingSocketWrapper.wrap( + endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs())); + this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException(); + this.endpoint = endpoint; + this.clientManager = clientManager; + } + + @Override + public void onComplete() { + super.onComplete(); + returnSelf(); + } + + @Override + public void onError(Exception e) { + super.onError(e); + ThriftClient.resolveException(e, this); + returnSelf(); + } + + @Override + public void invalidate() { + if (!hasError()) { + super.onError(new Exception("This client has been invalidated")); + } + } + + @Override + public void invalidateAll() { + clientManager.clear(endpoint); + } + + @Override + public boolean printLogWhenEncounterException() { + return printLogWhenEncounterException; + } + + /** + * return self, the method doesn't need to be called by the user and will be triggered after the + * RPC is finished. + */ + public void returnSelf() { + if (shouldReturnSelf.get()) { + clientManager.returnClient(endpoint, this); + } + } + + public void setShouldReturnSelf(boolean shouldReturnSelf) { + this.shouldReturnSelf.set(shouldReturnSelf); + } + + private void close() { + ___transport.close(); + ___currentMethod = null; + } + + private boolean isReady() { + try { + checkReady(); + return true; + } catch (Exception e) { + if (printLogWhenEncounterException) { + LOGGER.error("Unexpected exception occurs in {} : {}", this, e.getMessage()); + } + return false; + } + } + + @Override + public String toString() { + return String.format("AsyncPipeDataTransferServiceClient{%s}", endpoint); + } + + public static class Factory + extends AsyncThriftClientFactory<TEndPoint, AsyncPipeDataTransferServiceClient> { + + public Factory( + ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> clientManager, + ThriftClientProperty thriftClientProperty, + String threadName) { + super(clientManager, thriftClientProperty, threadName); + } + + @Override + public void destroyObject( + TEndPoint endPoint, PooledObject<AsyncPipeDataTransferServiceClient> pooledObject) { + pooledObject.getObject().close(); + } + + @Override + public PooledObject<AsyncPipeDataTransferServiceClient> makeObject(TEndPoint endPoint) + throws Exception { + return new DefaultPooledObject<>( + new AsyncPipeDataTransferServiceClient( + thriftClientProperty, + endPoint, + tManagers[clientCnt.incrementAndGet() % tManagers.length], + clientManager)); + } + + @Override + public boolean validateObject( + TEndPoint endPoint, PooledObject<AsyncPipeDataTransferServiceClient> pooledObject) { + return pooledObject.getObject().isReady(); + } + } +} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java index 8bf376138e3..e8ef0018810 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java @@ -23,6 +23,8 @@ import org.apache.iotdb.commons.pipe.plugin.builtin.collector.IoTDBCollector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBSyncConnectorV1_1; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector; +import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnectorV1; +import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnectorV2; import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor; public enum BuiltinPipePlugin { @@ -36,6 +38,8 @@ public enum BuiltinPipePlugin { // connectors DO_NOTHING_CONNECTOR("do_nothing_connector", DoNothingConnector.class), IOTDB_THRIFT_CONNECTOR("iotdb_thrift_connector", IoTDBThriftConnector.class), + IOTDB_THRIFT_CONNECTOR_V1("iotdb_thrift_connector_v1", IoTDBThriftConnectorV1.class), + IOTDB_THRIFT_CONNECTOR_V2("iotdb_thrift_connector_v2", IoTDBThriftConnectorV2.class), IOTDB_SYNC_CONNECTOR_V_1_1("iotdb_sync_connector_v1.1", IoTDBSyncConnectorV1_1.class), ; diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java index 315e1347a6d..099bdd15557 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java @@ -36,43 +36,43 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; public class IoTDBThriftConnector implements PipeConnector { @Override - public void validate(PipeParameterValidator validator) { + public final void validate(PipeParameterValidator validator) { throw new UnsupportedOperationException("This class is a placeholder and should not be used."); } @Override - public void customize( + public final void customize( PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration) { throw new UnsupportedOperationException("This class is a placeholder and should not be used."); } @Override - public void handshake() { + public final void handshake() { throw new UnsupportedOperationException("This class is a placeholder and should not be used."); } @Override - public void heartbeat() { + public final void heartbeat() { throw new UnsupportedOperationException("This class is a placeholder and should not be used."); } @Override - public void transfer(TabletInsertionEvent tabletInsertionEvent) { + public final void transfer(TabletInsertionEvent tabletInsertionEvent) { throw new UnsupportedOperationException("This class is a placeholder and should not be used."); } @Override - public void transfer(TsFileInsertionEvent tsFileInsertionEvent) { + public final void transfer(TsFileInsertionEvent tsFileInsertionEvent) { throw new UnsupportedOperationException("This class is a placeholder and should not be used."); } @Override - public void transfer(Event event) { + public final void transfer(Event event) { throw new UnsupportedOperationException("This class is a placeholder and should not be used."); } @Override - public void close() { + public final void close() { throw new UnsupportedOperationException("This class is a placeholder and should not be used."); } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnectorV1.java similarity index 74% copy from server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java copy to node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnectorV1.java index 7dac858ae3f..e708767aac6 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnectorV1.java @@ -17,19 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.connector; +package org.apache.iotdb.commons.pipe.plugin.builtin.connector; -public enum IoTDBThriftConnectorVersion { - VERSION_ONE((byte) 1), - ; - - private final byte version; - - IoTDBThriftConnectorVersion(byte type) { - this.version = type; - } - - public byte getVersion() { - return version; - } -} +public class IoTDBThriftConnectorV1 extends IoTDBThriftConnector {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnectorV2.java similarity index 74% copy from server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java copy to node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnectorV2.java index 7dac858ae3f..c5aa3710ad2 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnectorV2.java @@ -17,19 +17,6 @@ * under the License. */ -package org.apache.iotdb.db.pipe.connector; +package org.apache.iotdb.commons.pipe.plugin.builtin.connector; -public enum IoTDBThriftConnectorVersion { - VERSION_ONE((byte) 1), - ; - - private final byte version; - - IoTDBThriftConnectorVersion(byte type) { - this.version = type; - } - - public byte getVersion() { - return version; - } -} +public class IoTDBThriftConnectorV2 extends IoTDBThriftConnector {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java index 4230ee73a34..80b18288c3b 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java @@ -21,15 +21,15 @@ package org.apache.iotdb.db.pipe.agent.receiver; import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher; import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher; -import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion; +import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; public interface IoTDBThriftReceiver { - IoTDBThriftConnectorVersion getVersion(); + IoTDBThriftConnectorRequestVersion getVersion(); - TPipeTransferResp handleTransferReq( + TPipeTransferResp receive( TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher); void handleExit(); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java index fdeda91ea56..663e169c0bf 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java @@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.agent.receiver; import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher; import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher; -import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion; +import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion; import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftReceiverV1; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -37,11 +37,11 @@ public class PipeReceiverAgent { private final ThreadLocal<IoTDBThriftReceiver> receiverThreadLocal = new ThreadLocal<>(); - public TPipeTransferResp transfer( + public TPipeTransferResp receive( TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) { final byte reqVersion = req.getVersion(); - if (reqVersion == IoTDBThriftConnectorVersion.VERSION_ONE.getVersion()) { - return getReceiver(reqVersion).handleTransferReq(req, partitionFetcher, schemaFetcher); + if (reqVersion == IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion()) { + return getReceiver(reqVersion).receive(req, partitionFetcher, schemaFetcher); } else { return new TPipeTransferResp( RpcUtils.getStatus( @@ -71,7 +71,7 @@ public class PipeReceiverAgent { } private IoTDBThriftReceiver setAndGetReceiver(byte reqVersion) { - if (reqVersion == IoTDBThriftConnectorVersion.VERSION_ONE.getVersion()) { + if (reqVersion == IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion()) { receiverThreadLocal.set(new IoTDBThriftReceiverV1()); } else { throw new UnsupportedOperationException( diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java index 7c92206de81..d0b9002d2d4 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java @@ -25,9 +25,11 @@ public class PipeConnectorConstant { public static final String CONNECTOR_IOTDB_IP_KEY = "connector.ip"; public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port"; + public static final String CONNECTOR_IOTDB_NODE_URLS_KEY = "connector.node-urls"; public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user"; public static final String CONNECTOR_IOTDB_USER_DEFAULT_VALUE = "root"; + public static final String CONNECTOR_IOTDB_PASSWORD_KEY = "connector.password"; public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root"; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorRequestVersion.java similarity index 87% rename from server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java rename to server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorRequestVersion.java index 7dac858ae3f..670dd9eb6e0 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorRequestVersion.java @@ -19,13 +19,14 @@ package org.apache.iotdb.db.pipe.connector; -public enum IoTDBThriftConnectorVersion { - VERSION_ONE((byte) 1), +public enum IoTDBThriftConnectorRequestVersion { + VERSION_1((byte) 1), + VERSION_2((byte) 1), ; private final byte version; - IoTDBThriftConnectorVersion(byte type) { + IoTDBThriftConnectorRequestVersion(byte type) { this.version = type; } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorImplV1_1.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorV1_1.java similarity index 97% rename from server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorImplV1_1.java rename to server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorV1_1.java index 3099cf008cc..53a45e83a87 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorImplV1_1.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/lagacy/IoTDBSyncConnectorV1_1.java @@ -26,7 +26,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; import org.apache.iotdb.commons.pipe.config.PipeConfig; -import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorClient; +import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftConnectorClient; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; @@ -64,9 +64,9 @@ import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY; -public class IoTDBSyncConnectorImplV1_1 implements PipeConnector { +public class IoTDBSyncConnectorV1_1 implements PipeConnector { - private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSyncConnectorImplV1_1.class); + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSyncConnectorV1_1.class); private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); public static final String IOTDB_SYNC_CONNECTOR_VERSION = "1.1"; @@ -163,7 +163,7 @@ public class IoTDBSyncConnectorImplV1_1 implements PipeConnector { "IoTDBSyncConnectorV1_1 only support PipeInsertNodeInsertionEvent and PipeTabletInsertionEvent."); } } catch (TException e) { - LOGGER.error( + LOGGER.warn( "Network error when transfer tablet insertion event: {}.", tabletInsertionEvent, e); // the connection may be broken, try to reconnect by catching PipeConnectionException throw new PipeConnectionException( @@ -193,7 +193,7 @@ public class IoTDBSyncConnectorImplV1_1 implements PipeConnector { try { doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent); } catch (TException e) { - LOGGER.error( + LOGGER.warn( "Network error when transfer tsFile insertion event: {}.", tsFileInsertionEvent, e); // The connection may be broken, try to reconnect by catching PipeConnectionException throw new PipeConnectionException("Network error when transfer tsFile insertion event.", e); @@ -244,7 +244,7 @@ public class IoTDBSyncConnectorImplV1_1 implements PipeConnector { } } } catch (TException e) { - LOGGER.error(String.format("Cannot send pipe data to receiver %s:%s.", ipAddress, port), e); + LOGGER.warn(String.format("Cannot send pipe data to receiver %s:%s.", ipAddress, port), e); throw new PipeConnectionException(e.getMessage(), e); } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorClient.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorClient.java similarity index 97% rename from server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorClient.java rename to server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorClient.java index db0047372db..c8f506fdded 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorClient.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorClient.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.pipe.connector; +package org.apache.iotdb.db.pipe.connector.v1; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.property.ThriftClientProperty; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java index 3e1e0db312c..e863bad5dd9 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java @@ -25,7 +25,6 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorClient; import org.apache.iotdb.db.pipe.connector.v1.reponse.PipeTransferFilePieceResp; import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFilePieceReq; import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFileSealReq; @@ -133,7 +132,7 @@ public class IoTDBThriftConnectorV1 implements PipeConnector { "IoTDBThriftConnectorV1 only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent."); } } catch (TException e) { - LOGGER.error( + LOGGER.warn( "Network error when transfer tablet insertion event: {}.", tabletInsertionEvent, e); // the connection may be broken, try to reconnect by catching PipeConnectionException throw new PipeConnectionException( @@ -184,7 +183,7 @@ public class IoTDBThriftConnectorV1 implements PipeConnector { try { doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent); } catch (TException e) { - LOGGER.error( + LOGGER.warn( "Network error when transfer tsfile insertion event: {}.", tsFileInsertionEvent, e); // the connection may be broken, try to reconnect by catching PipeConnectionException throw new PipeConnectionException( diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java index dfd6765b68d..5ab2b19c5a7 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java @@ -30,7 +30,7 @@ import org.apache.iotdb.db.mpp.plan.statement.Statement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.pipe.agent.receiver.IoTDBThriftReceiver; -import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion; +import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion; import org.apache.iotdb.db.pipe.connector.v1.reponse.PipeTransferFilePieceResp; import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFilePieceReq; import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFileSealReq; @@ -61,7 +61,7 @@ public class IoTDBThriftReceiverV1 implements IoTDBThriftReceiver { private RandomAccessFile writingFileWriter; @Override - public synchronized TPipeTransferResp handleTransferReq( + public synchronized TPipeTransferResp receive( TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) { final short rawRequestType = req.getType(); if (PipeRequestType.isValidatedRequestType(rawRequestType)) { @@ -297,7 +297,7 @@ public class IoTDBThriftReceiverV1 implements IoTDBThriftReceiver { } @Override - public IoTDBThriftConnectorVersion getVersion() { - return IoTDBThriftConnectorVersion.VERSION_ONE; + public IoTDBThriftConnectorRequestVersion getVersion() { + return IoTDBThriftConnectorRequestVersion.VERSION_1; } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFilePieceReq.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFilePieceReq.java index 897ee2a29e7..d26939ec54f 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFilePieceReq.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFilePieceReq.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.pipe.connector.v1.request; -import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion; +import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion; import org.apache.iotdb.db.pipe.connector.v1.PipeRequestType; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.tsfile.utils.Binary; @@ -58,7 +58,7 @@ public class PipeTransferFilePieceReq extends TPipeTransferReq { filePieceReq.startWritingOffset = startWritingOffset; filePieceReq.filePiece = filePiece; - filePieceReq.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion(); + filePieceReq.version = IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion(); filePieceReq.type = PipeRequestType.TRANSFER_FILE_PIECE.getType(); try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFileSealReq.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFileSealReq.java index 0e73b95af07..1314e5e5ae0 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFileSealReq.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFileSealReq.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.pipe.connector.v1.request; -import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion; +import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion; import org.apache.iotdb.db.pipe.connector.v1.PipeRequestType; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.tsfile.utils.PublicBAOS; @@ -51,7 +51,7 @@ public class PipeTransferFileSealReq extends TPipeTransferReq { fileSealReq.fileName = fileName; fileSealReq.fileLength = fileLength; - fileSealReq.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion(); + fileSealReq.version = IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion(); fileSealReq.type = PipeRequestType.TRANSFER_FILE_SEAL.getType(); try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferHandshakeReq.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferHandshakeReq.java index da03903fd79..aceb9a5877b 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferHandshakeReq.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferHandshakeReq.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.pipe.connector.v1.request; -import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion; +import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion; import org.apache.iotdb.db.pipe.connector.v1.PipeRequestType; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.tsfile.utils.PublicBAOS; @@ -45,7 +45,7 @@ public class PipeTransferHandshakeReq extends TPipeTransferReq { handshakeReq.timestampPrecision = timestampPrecision; - handshakeReq.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion(); + handshakeReq.version = IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion(); handshakeReq.type = PipeRequestType.HANDSHAKE.getType(); try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferInsertNodeReq.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferInsertNodeReq.java index 10098a9721e..157a4200fd6 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferInsertNodeReq.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferInsertNodeReq.java @@ -26,7 +26,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.mpp.plan.statement.Statement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement; -import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion; +import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion; import org.apache.iotdb.db.pipe.connector.v1.PipeRequestType; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; @@ -83,7 +83,7 @@ public class PipeTransferInsertNodeReq extends TPipeTransferReq { req.insertNode = insertNode; - req.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion(); + req.version = IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion(); req.type = PipeRequestType.TRANSFER_INSERT_NODE.getType(); req.body = insertNode.serializeToByteBuffer(); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java index 758013d6c72..ead686a6acf 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java @@ -23,7 +23,7 @@ import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement; -import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion; +import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion; import org.apache.iotdb.db.pipe.connector.v1.PipeRequestType; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq; @@ -47,12 +47,12 @@ public class PipeTransferTabletReq extends TPipeTransferReq { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferTabletReq.class); private Tablet tablet; - public static TPipeTransferReq toTPipeTransferReq(Tablet tablet) throws IOException { + public static PipeTransferTabletReq toTPipeTransferReq(Tablet tablet) throws IOException { final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq(); tabletReq.tablet = tablet; - tabletReq.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion(); + tabletReq.version = IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion(); tabletReq.type = PipeRequestType.TRANSFER_TABLET.getType(); tabletReq.body = tablet.serialize(); return tabletReq; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java new file mode 100644 index 00000000000..e22d2620cc9 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.connector.v2; + +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.client.ClientPoolFactory; +import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; +import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftConnectorClient; +import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferHandshakeReq; +import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferInsertNodeReq; +import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferTabletReq; +import org.apache.iotdb.db.pipe.connector.v2.handler.PipeTransferInsertNodeTabletInsertionEventHandler; +import org.apache.iotdb.db.pipe.connector.v2.handler.PipeTransferRawTabletInsertionEventHandler; +import org.apache.iotdb.db.pipe.connector.v2.handler.PipeTransferTsFileInsertionEventHandler; +import org.apache.iotdb.db.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.pipe.api.PipeConnector; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.event.Event; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; +import org.apache.iotdb.pipe.api.exception.PipeConnectionException; +import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; +import org.apache.iotdb.session.util.SessionUtils; +import org.apache.iotdb.tsfile.utils.Pair; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.PriorityQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY; + +public class IoTDBThriftConnectorV2 implements PipeConnector { + + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBThriftConnectorV2.class); + + private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); + private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig(); + + private static final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> + ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER = + new IClientManager.Factory<TEndPoint, AsyncPipeDataTransferServiceClient>() + .createClientManager( + new ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()); + + private final AtomicBoolean isClosed = new AtomicBoolean(false); + + private final AtomicLong commitIdGenerator = new AtomicLong(0); + private final AtomicLong lastCommitId = new AtomicLong(0); + private final PriorityQueue<Pair<Long, Runnable>> commitQueue = + new PriorityQueue<>(Comparator.comparing(o -> o.left)); + + private List<TEndPoint> nodeUrls; + + public synchronized void commit(long requestCommitId, @Nullable EnrichedEvent enrichedEvent) { + commitQueue.offer( + new Pair<>( + requestCommitId, + () -> + Optional.ofNullable(enrichedEvent) + .ifPresent( + event -> + event.decreaseReferenceCount(IoTDBThriftConnectorV2.class.getName())))); + + while (!commitQueue.isEmpty()) { + final Pair<Long, Runnable> committer = commitQueue.peek(); + if (lastCommitId.get() + 1 != committer.left) { + break; + } + + committer.right.run(); + lastCommitId.incrementAndGet(); + + commitQueue.poll(); + } + } + + @Override + public void validate(PipeParameterValidator validator) throws Exception { + // node urls string should be like "localhost:6667,localhost:6668" + validator.validateRequiredAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY); + } + + @Override + public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration) + throws Exception { + nodeUrls = + SessionUtils.parseSeedNodeUrls( + Arrays.asList(parameters.getString(CONNECTOR_IOTDB_NODE_URLS_KEY).split(","))); + if (nodeUrls.isEmpty()) { + throw new PipeException("Node urls is empty."); + } + } + + @Override + public void handshake() throws Exception { + final TEndPoint firstNodeUrl = nodeUrls.get(0); + try (IoTDBThriftConnectorClient client = + new IoTDBThriftConnectorClient( + new ThriftClientProperty.Builder() + .setConnectionTimeoutMs(COMMON_CONFIG.getConnectionTimeoutInMS()) + .setRpcThriftCompressionEnabled(COMMON_CONFIG.isRpcThriftCompressionEnabled()) + .build(), + firstNodeUrl.getIp(), + firstNodeUrl.getPort())) { + final TPipeTransferResp resp = + client.pipeTransfer( + PipeTransferHandshakeReq.toTPipeTransferReq(IOTDB_CONFIG.getTimestampPrecision())); + if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException(String.format("Handshake error, result status %s.", resp.status)); + } + } catch (TException e) { + LOGGER.warn( + String.format( + "Connect to receiver %s:%s error.", firstNodeUrl.getIp(), firstNodeUrl.getPort()), + e); + throw new PipeConnectionException(e.getMessage(), e); + } + } + + @Override + public void heartbeat() { + // do nothing + } + + @Override + public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception { + if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) { + final long requestCommitId = commitIdGenerator.incrementAndGet(); + final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent = + (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent; + final PipeTransferInsertNodeReq pipeTransferInsertNodeReq = + PipeTransferInsertNodeReq.toTPipeTransferReq( + pipeInsertNodeTabletInsertionEvent.getInsertNode()); + final PipeTransferInsertNodeTabletInsertionEventHandler pipeTransferInsertNodeReqHandler = + new PipeTransferInsertNodeTabletInsertionEventHandler( + requestCommitId, pipeInsertNodeTabletInsertionEvent, pipeTransferInsertNodeReq, this); + + transfer(requestCommitId, pipeTransferInsertNodeReqHandler); + } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) { + final long requestCommitId = commitIdGenerator.incrementAndGet(); + final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent = + (PipeRawTabletInsertionEvent) tabletInsertionEvent; + final PipeTransferTabletReq pipeTransferTabletReq = + PipeTransferTabletReq.toTPipeTransferReq(pipeRawTabletInsertionEvent.convertToTablet()); + final PipeTransferRawTabletInsertionEventHandler pipeTransferTabletReqHandler = + new PipeTransferRawTabletInsertionEventHandler( + requestCommitId, pipeTransferTabletReq, this); + + transfer(requestCommitId, pipeTransferTabletReqHandler); + } else { + throw new NotImplementedException( + "IoTDBThriftConnectorV2 only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent."); + } + } + + public void transfer( + long requestCommitId, + PipeTransferInsertNodeTabletInsertionEventHandler pipeTransferInsertNodeReqHandler) { + final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % nodeUrls.size())); + + try { + final AsyncPipeDataTransferServiceClient client = + ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER.borrowClient(targetNodeUrl); + + try { + pipeTransferInsertNodeReqHandler.transfer(client); + } catch (TException e) { + LOGGER.warn( + String.format( + "Transfer insert node to receiver %s:%s error, retrying...", + targetNodeUrl.getIp(), targetNodeUrl.getPort()), + e); + } + } catch (Exception ex) { + pipeTransferInsertNodeReqHandler.onError(ex); + LOGGER.warn( + String.format( + "Failed to borrow client from client pool for receiver %s:%s.", + targetNodeUrl.getIp(), targetNodeUrl.getPort()), + ex); + } + } + + public void transfer( + long requestCommitId, + PipeTransferRawTabletInsertionEventHandler pipeTransferTabletReqHandler) { + final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % nodeUrls.size())); + + try { + final AsyncPipeDataTransferServiceClient client = + ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER.borrowClient(targetNodeUrl); + + try { + pipeTransferTabletReqHandler.transfer(client); + } catch (TException e) { + LOGGER.warn( + String.format( + "Transfer tablet to receiver %s:%s error, retrying...", + targetNodeUrl.getIp(), targetNodeUrl.getPort()), + e); + } + } catch (Exception ex) { + pipeTransferTabletReqHandler.onError(ex); + LOGGER.warn( + String.format( + "Failed to borrow client from client pool for receiver %s:%s.", + targetNodeUrl.getIp(), targetNodeUrl.getPort()), + ex); + } + } + + @Override + public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception { + if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) { + throw new NotImplementedException( + "IoTDBThriftConnectorV2 only support PipeTsFileInsertionEvent."); + } + + final long requestCommitId = commitIdGenerator.incrementAndGet(); + final PipeTsFileInsertionEvent pipeTsFileInsertionEvent = + (PipeTsFileInsertionEvent) tsFileInsertionEvent; + final PipeTransferTsFileInsertionEventHandler pipeTransferTsFileInsertionEventHandler = + new PipeTransferTsFileInsertionEventHandler( + requestCommitId, pipeTsFileInsertionEvent, this); + + pipeTsFileInsertionEvent.waitForTsFileClose(); + transfer(requestCommitId, pipeTransferTsFileInsertionEventHandler); + } + + public void transfer( + long requestCommitId, + PipeTransferTsFileInsertionEventHandler pipeTransferTsFileInsertionEventHandler) { + final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % nodeUrls.size())); + + try { + final AsyncPipeDataTransferServiceClient client = + ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER.borrowClient(targetNodeUrl); + + try { + pipeTransferTsFileInsertionEventHandler.transfer(client); + } catch (TException e) { + LOGGER.warn( + String.format( + "Transfer tsfile to receiver %s:%s error, retrying...", + targetNodeUrl.getIp(), targetNodeUrl.getPort()), + e); + } + } catch (Exception ex) { + pipeTransferTsFileInsertionEventHandler.onError(ex); + LOGGER.warn( + String.format( + "Failed to borrow client from client pool for receiver %s:%s.", + targetNodeUrl.getIp(), targetNodeUrl.getPort()), + ex); + } + } + + @Override + public void transfer(Event event) { + LOGGER.warn("IoTDBThriftConnectorV2 does not support transfer generic event: {}.", event); + } + + @Override + public void close() { + isClosed.set(true); + } + + public boolean isClosed() { + return isClosed.get(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java new file mode 100644 index 00000000000..c8d95c8691e --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.connector.v2.handler; + +import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; +import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2; +import org.apache.iotdb.db.pipe.event.EnrichedEvent; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; + +import org.apache.thrift.TException; +import org.jetbrains.annotations.Nullable; + +public class PipeTransferInsertNodeTabletInsertionEventHandler + extends PipeTransferTabletInsertionEventHandler<TPipeTransferResp> { + public PipeTransferInsertNodeTabletInsertionEventHandler( + long requestCommitId, + @Nullable EnrichedEvent event, + TPipeTransferReq req, + IoTDBThriftConnectorV2 connector) { + super(requestCommitId, event, req, connector); + } + + @Override + protected void doTransfer(AsyncPipeDataTransferServiceClient client, TPipeTransferReq req) + throws TException { + client.pipeTransfer(req, this); + } + + @Override + protected void retryTransfer(IoTDBThriftConnectorV2 connector, long requestCommitId) { + connector.transfer(requestCommitId, this); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferRawTabletInsertionEventHandler.java similarity index 51% copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java copy to server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferRawTabletInsertionEventHandler.java index 4230ee73a34..d0096eaafc2 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferRawTabletInsertionEventHandler.java @@ -17,20 +17,31 @@ * under the License. */ -package org.apache.iotdb.db.pipe.agent.receiver; +package org.apache.iotdb.db.pipe.connector.v2.handler; -import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher; -import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher; -import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorVersion; +import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; +import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; -public interface IoTDBThriftReceiver { +import org.apache.thrift.TException; - IoTDBThriftConnectorVersion getVersion(); +public class PipeTransferRawTabletInsertionEventHandler + extends PipeTransferTabletInsertionEventHandler<TPipeTransferResp> { - TPipeTransferResp handleTransferReq( - TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher); + public PipeTransferRawTabletInsertionEventHandler( + long requestCommitId, TPipeTransferReq req, IoTDBThriftConnectorV2 connector) { + super(requestCommitId, null, req, connector); + } - void handleExit(); + @Override + protected void doTransfer(AsyncPipeDataTransferServiceClient client, TPipeTransferReq req) + throws TException { + client.pipeTransfer(req, this); + } + + @Override + protected void retryTransfer(IoTDBThriftConnectorV2 connector, long requestCommitId) { + connector.transfer(requestCommitId, this); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java new file mode 100644 index 00000000000..6b150c53357 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.connector.v2.handler; + +import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; +import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2; +import org.apache.iotdb.db.pipe.event.EnrichedEvent; +import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; + +import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +public abstract class PipeTransferTabletInsertionEventHandler<E extends TPipeTransferResp> + implements AsyncMethodCallback<E> { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PipeTransferInsertNodeTabletInsertionEventHandler.class); + + private final long requestCommitId; + private final EnrichedEvent event; + private final TPipeTransferReq req; + + private final IoTDBThriftConnectorV2 connector; + + private static final long MAX_RETRY_WAIT_TIME_MS = + (long) (PipeConfig.getInstance().getPipeConnectorRetryIntervalMs() * Math.pow(2, 5)); + private int retryCount = 0; + + public PipeTransferTabletInsertionEventHandler( + long requestCommitId, + @Nullable EnrichedEvent event, + TPipeTransferReq req, + IoTDBThriftConnectorV2 connector) { + this.requestCommitId = requestCommitId; + this.event = event; + this.req = req; + this.connector = connector; + + Optional.ofNullable(event) + .ifPresent( + e -> e.increaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName())); + } + + public void transfer(AsyncPipeDataTransferServiceClient client) throws TException { + doTransfer(client, req); + } + + protected abstract void doTransfer( + AsyncPipeDataTransferServiceClient client, TPipeTransferReq req) throws TException; + + @Override + public void onComplete(TPipeTransferResp response) { + // just in case + if (response == null) { + onError(new PipeException("TPipeTransferResp is null")); + return; + } + + if (response.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + connector.commit(requestCommitId, event); + } else { + onError(new PipeException(response.getStatus().getMessage())); + } + } + + @Override + public void onError(Exception exception) { + ++retryCount; + + CompletableFuture.runAsync( + () -> { + try { + Thread.sleep( + Math.min( + (long) + (PipeConfig.getInstance().getPipeConnectorRetryIntervalMs() + * Math.pow(2d, retryCount - 1d)), + MAX_RETRY_WAIT_TIME_MS)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("Unexpected interruption during retrying", e); + } + + if (connector.isClosed()) { + LOGGER.info( + "IoTDBThriftConnectorV2 has been stopped, we will not retry this request {} after {} times", + req, + retryCount, + exception); + } else { + LOGGER.warn( + "IoTDBThriftConnectorV2 failed to transfer request {} after {} times, retrying...", + req, + retryCount, + exception); + + retryTransfer(connector, requestCommitId); + } + }); + } + + protected abstract void retryTransfer(IoTDBThriftConnectorV2 connector, long requestCommitId); +} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java new file mode 100644 index 00000000000..21c244f0537 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.connector.v2.handler; + +import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; +import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.db.pipe.connector.v1.reponse.PipeTransferFilePieceResp; +import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFilePieceReq; +import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFileSealReq; +import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2; +import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; + +import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +public class PipeTransferTsFileInsertionEventHandler + implements AsyncMethodCallback<TPipeTransferResp> { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PipeTransferTsFileInsertionEventHandler.class); + + private final long requestCommitId; + private final PipeTsFileInsertionEvent event; + private final IoTDBThriftConnectorV2 connector; + + private final File tsFile; + private final int readFileBufferSize; + private final byte[] readBuffer; + private long position; + + private RandomAccessFile reader; + + private final AtomicBoolean isSealSignalSent; + + private AsyncPipeDataTransferServiceClient client; + + private static final long MAX_RETRY_WAIT_TIME_MS = + (long) (PipeConfig.getInstance().getPipeConnectorRetryIntervalMs() * Math.pow(2, 5)); + private int retryCount = 0; + + public PipeTransferTsFileInsertionEventHandler( + long requestCommitId, PipeTsFileInsertionEvent event, IoTDBThriftConnectorV2 connector) + throws FileNotFoundException { + this.requestCommitId = requestCommitId; + this.event = event; + this.connector = connector; + + tsFile = event.getTsFile(); + readFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize(); + readBuffer = new byte[readFileBufferSize]; + position = 0; + + reader = new RandomAccessFile(tsFile, "r"); + + isSealSignalSent = new AtomicBoolean(false); + + event.increaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName()); + } + + public void transfer(AsyncPipeDataTransferServiceClient client) throws TException, IOException { + this.client = client; + client.setShouldReturnSelf(false); + + final int readLength = reader.read(readBuffer); + + if (readLength == -1) { + isSealSignalSent.set(true); + client.pipeTransfer( + PipeTransferFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length()), this); + return; + } + + client.pipeTransfer( + PipeTransferFilePieceReq.toTPipeTransferReq( + tsFile.getName(), + position, + readLength == readFileBufferSize + ? readBuffer + : Arrays.copyOfRange(readBuffer, 0, readLength)), + this); + position += readLength; + } + + @Override + public void onComplete(TPipeTransferResp response) { + if (isSealSignalSent.get()) { + if (response.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + onError( + new PipeException( + String.format( + "Seal file %s error, result status %s.", tsFile, response.getStatus()))); + return; + } + + try { + if (reader != null) { + reader.close(); + } + } catch (IOException e) { + LOGGER.warn("Failed to close file reader.", e); + } finally { + if (client != null) { + client.setShouldReturnSelf(true); + client.returnSelf(); + } + + connector.commit(requestCommitId, event); + } + return; + } + + // if the isSealSignalSent is false, then the response must be a PipeTransferFilePieceResp + try { + final PipeTransferFilePieceResp resp = + PipeTransferFilePieceResp.fromTPipeTransferResp(response); + + // this case only happens when the connection is broken, and the connector is reconnected + // to the receiver, then the receiver will redirect the file position to the last position + final long code = resp.getStatus().getCode(); + + if (code == TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) { + position = resp.getEndWritingOffset(); + reader.seek(position); + LOGGER.info(String.format("Redirect file position to %s.", position)); + } else if (code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException( + String.format("Transfer file %s error, result status %s.", tsFile, resp.getStatus())); + } + + transfer(client); + } catch (Exception e) { + onError(e); + } + } + + @Override + public void onError(Exception exception) { + try { + if (reader != null) { + reader.close(); + } + } catch (IOException e) { + LOGGER.warn("Failed to close file reader.", e); + } finally { + if (client != null) { + client.setShouldReturnSelf(true); + client.returnSelf(); + } + } + + ++retryCount; + + CompletableFuture.runAsync( + () -> { + try { + Thread.sleep( + Math.min( + (long) + (PipeConfig.getInstance().getPipeConnectorRetryIntervalMs() + * Math.pow(2d, retryCount - 1d)), + MAX_RETRY_WAIT_TIME_MS)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("Unexpected interruption during retrying", e); + } + + if (connector.isClosed()) { + LOGGER.info( + "IoTDBThriftConnectorV2 has been stopped, we will not retry to transfer tsfile {}.", + tsFile, + exception); + } else { + LOGGER.warn( + "IoTDBThriftConnectorV2 failed to transfer tsfile {} after {} times, retrying...", + tsFile, + retryCount, + exception); + + try { + position = 0; + reader = new RandomAccessFile(tsFile, "r"); + isSealSignalSent.set(false); + + connector.transfer(requestCommitId, this); + } catch (FileNotFoundException e) { + LOGGER.error("Exception occurred when retrying...", e); + onError(e); + } + } + }); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java index 27975236127..f2df19bf2b8 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java @@ -57,7 +57,7 @@ public abstract class EnrichedEvent implements Event { boolean isSuccessful = true; synchronized (this) { if (referenceCount.get() == 0) { - isSuccessful = increaseResourceReferenceCount(holderMessage); + isSuccessful = internallyIncreaseResourceReferenceCount(holderMessage); } referenceCount.incrementAndGet(); } @@ -71,7 +71,7 @@ public abstract class EnrichedEvent implements Event { * @return true if the reference count is increased successfully, false if the event is not * controlled by the invoker, which means the data stored in the event is not safe to use */ - public abstract boolean increaseResourceReferenceCount(String holderMessage); + public abstract boolean internallyIncreaseResourceReferenceCount(String holderMessage); /** * Decrease the reference count of this event. If the reference count is decreased to 0, the event @@ -85,7 +85,7 @@ public abstract class EnrichedEvent implements Event { boolean isSuccessful = true; synchronized (this) { if (referenceCount.get() == 1) { - isSuccessful = decreaseResourceReferenceCount(holderMessage); + isSuccessful = internallyDecreaseResourceReferenceCount(holderMessage); reportProgress(); } referenceCount.decrementAndGet(); @@ -100,7 +100,7 @@ public abstract class EnrichedEvent implements Event { * @param holderMessage the message of the invoker * @return true if the reference count is decreased successfully, false otherwise */ - public abstract boolean decreaseResourceReferenceCount(String holderMessage); + public abstract boolean internallyDecreaseResourceReferenceCount(String holderMessage); private void reportProgress() { if (pipeTaskMeta != null) { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 8a3f9ca8a77..5623d7715a6 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -70,7 +70,7 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent /////////////////////////// EnrichedEvent /////////////////////////// @Override - public boolean increaseResourceReferenceCount(String holderMessage) { + public boolean internallyIncreaseResourceReferenceCount(String holderMessage) { try { PipeResourceManager.wal().pin(walEntryHandler.getMemTableId(), walEntryHandler); return true; @@ -85,7 +85,7 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent } @Override - public boolean decreaseResourceReferenceCount(String holderMessage) { + public boolean internallyDecreaseResourceReferenceCount(String holderMessage) { try { PipeResourceManager.wal().unpin(walEntryHandler.getMemTableId()); return true; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index d82313dad8d..40aaf53c1ec 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -107,7 +107,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns /////////////////////////// EnrichedEvent /////////////////////////// @Override - public boolean increaseResourceReferenceCount(String holderMessage) { + public boolean internallyIncreaseResourceReferenceCount(String holderMessage) { try { tsFile = PipeResourceManager.file().increaseFileReference(tsFile, true); return true; @@ -122,7 +122,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns } @Override - public boolean decreaseResourceReferenceCount(String holderMessage) { + public boolean internallyDecreaseResourceReferenceCount(String holderMessage) { try { PipeResourceManager.file().decreaseFileReference(tsFile); return true; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEvent.java index 06410b26a02..cb84d0bc11a 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeCollectEvent.java @@ -95,8 +95,8 @@ public class PipeRealtimeCollectEvent extends EnrichedEvent { } @Override - public boolean increaseResourceReferenceCount(String holderMessage) { - return event.increaseResourceReferenceCount(holderMessage); + public boolean internallyIncreaseResourceReferenceCount(String holderMessage) { + return event.internallyIncreaseResourceReferenceCount(holderMessage); } @Override @@ -109,8 +109,8 @@ public class PipeRealtimeCollectEvent extends EnrichedEvent { } @Override - public boolean decreaseResourceReferenceCount(String holderMessage) { - return event.decreaseResourceReferenceCount(holderMessage); + public boolean internallyDecreaseResourceReferenceCount(String holderMessage) { + return event.internallyDecreaseResourceReferenceCount(holderMessage); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java index fadb6a0184f..829e0bb993b 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java @@ -24,8 +24,9 @@ import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant; import org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; -import org.apache.iotdb.db.pipe.connector.lagacy.IoTDBSyncConnectorImplV1_1; +import org.apache.iotdb.db.pipe.connector.lagacy.IoTDBSyncConnectorV1_1; import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftConnectorV1; +import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2; import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor; import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue; import org.apache.iotdb.pipe.api.PipeConnector; @@ -61,11 +62,15 @@ public class PipeConnectorSubtaskManager { BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName()); PipeConnector pipeConnector; - if (connectorKey.equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())) { + if (connectorKey.equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName()) + || connectorKey.equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR_V1.getPipePluginName())) { pipeConnector = new IoTDBThriftConnectorV1(); + } else if (connectorKey.equals( + BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR_V2.getPipePluginName())) { + pipeConnector = new IoTDBThriftConnectorV2(); } else if (connectorKey.equals( BuiltinPipePlugin.IOTDB_SYNC_CONNECTOR_V_1_1.getPipePluginName())) { - pipeConnector = new IoTDBSyncConnectorImplV1_1(); + pipeConnector = new IoTDBSyncConnectorV1_1(); } else { pipeConnector = PipeAgent.plugin().reflectConnector(pipeConnectorParameters); } @@ -126,15 +131,6 @@ public class PipeConnectorSubtaskManager { attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).stop(); } - public PipeConnectorSubtask getPipeConnectorSubtask(String attributeSortedString) { - if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { - throw new PipeException( - "Failed to get PipeConnectorSubtask. No such subtask: " + attributeSortedString); - } - - return attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getSubtask(); - } - public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue( String attributeSortedString) { if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java index 02f1c414fe0..a1a4104820e 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java @@ -2103,7 +2103,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TPipeTransferResp pipeTransfer(TPipeTransferReq req) { - return PipeAgent.receiver().transfer(req, partitionFetcher, schemaFetcher); + return PipeAgent.receiver().receive(req, partitionFetcher, schemaFetcher); } @Override
