This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch remove-sync-entry in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6f8a8020e5052c3d93b7efb59d1fc7a4bf29e7f5 Author: Steve Yurong Su <[email protected]> AuthorDate: Sat Jun 17 03:55:28 2023 +0800 rename IoTDBSyncReceiverV1_1 --- .../commons/pipe/plugin/builtin/BuiltinPipePlugin.java | 4 ++-- ...BSyncConnectorV1_1.java => IoTDBSyncConnector.java} | 2 +- ...BSyncConnectorV1_1.java => IoTDBSyncConnector.java} | 10 +++++----- ...TDBSyncReceiverV1_1.java => IoTDBSyncReceiver.java} | 18 +++++++++--------- .../pipe/task/subtask/PipeConnectorSubtaskManager.java | 7 +++---- .../db/service/thrift/impl/ClientRPCServiceImpl.java | 10 +++++----- 6 files changed, 25 insertions(+), 26 deletions(-) diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java index e8ef0018810..14d22f578c3 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java @@ -21,7 +21,7 @@ package org.apache.iotdb.commons.pipe.plugin.builtin; import org.apache.iotdb.commons.pipe.plugin.builtin.collector.IoTDBCollector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector; -import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBSyncConnectorV1_1; +import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBSyncConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnectorV1; import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnectorV2; @@ -40,7 +40,7 @@ public enum BuiltinPipePlugin { IOTDB_THRIFT_CONNECTOR("iotdb_thrift_connector", IoTDBThriftConnector.class), IOTDB_THRIFT_CONNECTOR_V1("iotdb_thrift_connector_v1", IoTDBThriftConnectorV1.class), IOTDB_THRIFT_CONNECTOR_V2("iotdb_thrift_connector_v2", IoTDBThriftConnectorV2.class), - IOTDB_SYNC_CONNECTOR_V_1_1("iotdb_sync_connector_v1.1", IoTDBSyncConnectorV1_1.class), + IOTDB_SYNC_CONNECTOR("iotdb_sync_connector", IoTDBSyncConnector.class), ; private final String pipePluginName; diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBSyncConnectorV1_1.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBSyncConnector.java similarity index 97% rename from node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBSyncConnectorV1_1.java rename to node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBSyncConnector.java index d994b096353..7a06e2b8824 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBSyncConnectorV1_1.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBSyncConnector.java @@ -33,7 +33,7 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; * here. The pipe agent in the server module will replace this class with the real implementation * when initializing the IoTDB Sync connector. */ -public class IoTDBSyncConnectorV1_1 implements PipeConnector { +public class IoTDBSyncConnector implements PipeConnector { @Override public void validate(PipeParameterValidator validator) { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnectorV1_1.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java similarity index 96% rename from server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnectorV1_1.java rename to server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java index 5f6b6f84906..a7ebbecff51 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnectorV1_1.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java @@ -64,9 +64,9 @@ import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY; -public class IoTDBSyncConnectorV1_1 implements PipeConnector { +public class IoTDBSyncConnector implements PipeConnector { - private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSyncConnectorV1_1.class); + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSyncConnector.class); private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); public static final String IOTDB_SYNC_CONNECTOR_VERSION = "1.1"; @@ -160,7 +160,7 @@ public class IoTDBSyncConnectorV1_1 implements PipeConnector { doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent); } else { throw new NotImplementedException( - "IoTDBSyncConnectorV1_1 only support PipeInsertNodeInsertionEvent and PipeTabletInsertionEvent."); + "IoTDBSyncConnector only support PipeInsertNodeInsertionEvent and PipeTabletInsertionEvent."); } } catch (TException e) { LOGGER.warn( @@ -187,7 +187,7 @@ public class IoTDBSyncConnectorV1_1 implements PipeConnector { public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception { if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) { throw new NotImplementedException( - "IoTDBSyncConnectorV1_1 only support PipeTsFileInsertionEvent."); + "IoTDBSyncConnector only support PipeTsFileInsertionEvent."); } try { @@ -251,7 +251,7 @@ public class IoTDBSyncConnectorV1_1 implements PipeConnector { @Override public void transfer(Event event) throws Exception { - LOGGER.warn("IoTDBSyncConnectorV1_1 does not support transfer generic event: {}.", event); + LOGGER.warn("IoTDBSyncConnector does not support transfer generic event: {}.", event); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiverV1_1.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java similarity index 97% rename from server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiverV1_1.java rename to server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java index 64b71888643..79480a4ea2c 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiverV1_1.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java @@ -58,17 +58,17 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; /** This class is responsible for implementing the RPC processing on the receiver-side. */ -public class IoTDBSyncReceiverV1_1 { +public class IoTDBSyncReceiver { - public static IoTDBSyncReceiverV1_1 getInstance() { - return IoTDBSyncReceiverV1_1Holder.INSTANCE; + public static IoTDBSyncReceiver getInstance() { + return IoTDBSyncReceiverHolder.INSTANCE; } - private static class IoTDBSyncReceiverV1_1Holder { - private static final IoTDBSyncReceiverV1_1 INSTANCE = new IoTDBSyncReceiverV1_1(); + private static class IoTDBSyncReceiverHolder { + private static final IoTDBSyncReceiver INSTANCE = new IoTDBSyncReceiver(); } - private static final Logger logger = LoggerFactory.getLogger(IoTDBSyncReceiverV1_1.class); + private static final Logger logger = LoggerFactory.getLogger(IoTDBSyncReceiver.class); private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); @@ -83,7 +83,7 @@ public class IoTDBSyncReceiverV1_1 { // The sync connectionId is unique in one IoTDB instance. private final AtomicLong connectionIdGenerator; - private IoTDBSyncReceiverV1_1() { + private IoTDBSyncReceiver() { currentConnectionId = new ThreadLocal<>(); connectionIdToIdentityInfoMap = new ConcurrentHashMap<>(); connectionIdToStartIndexRecord = new ConcurrentHashMap<>(); @@ -232,7 +232,7 @@ public class IoTDBSyncReceiverV1_1 { * @return {@link TSStatusCode#PIPESERVER_ERROR} if fail to receive or load; {@link * TSStatusCode#SUCCESS_STATUS} if load successfully. * @throws TException The connection between the sender and the receiver has not been established - * by {@link IoTDBSyncReceiverV1_1#handshake} + * by {@link IoTDBSyncReceiver#handshake} */ public TSStatus transportPipeData(ByteBuffer buff) throws TException { // step1. check connection @@ -288,7 +288,7 @@ public class IoTDBSyncReceiverV1_1 { * TSStatusCode#SYNC_FILE_REDIRECTION_ERROR} if startIndex needs to rollback because * mismatched; {@link TSStatusCode#SYNC_FILE_ERROR} if fail to receive file. * @throws TException The connection between the sender and the receiver has not been established - * by {@link IoTDBSyncReceiverV1_1#handshake} + * by {@link IoTDBSyncReceiver#handshake} */ public TSStatus transportFile(TSyncTransportMetaInfo metaInfo, ByteBuffer buff) throws TException { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java index 149808e1cb8..071ab4da15e 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java @@ -24,7 +24,7 @@ import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant; import org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; -import org.apache.iotdb.db.pipe.connector.legacy.IoTDBSyncConnectorV1_1; +import org.apache.iotdb.db.pipe.connector.legacy.IoTDBSyncConnector; import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftConnectorV1; import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2; import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor; @@ -68,9 +68,8 @@ public class PipeConnectorSubtaskManager { } else if (connectorKey.equals( BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR_V2.getPipePluginName())) { pipeConnector = new IoTDBThriftConnectorV2(); - } else if (connectorKey.equals( - BuiltinPipePlugin.IOTDB_SYNC_CONNECTOR_V_1_1.getPipePluginName())) { - pipeConnector = new IoTDBSyncConnectorV1_1(); + } else if (connectorKey.equals(BuiltinPipePlugin.IOTDB_SYNC_CONNECTOR.getPipePluginName())) { + pipeConnector = new IoTDBSyncConnector(); } else { pipeConnector = PipeAgent.plugin().reflectConnector(pipeConnectorParameters); } diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java index c29da22ab37..8a1f9e323d9 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java @@ -63,7 +63,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.DropSchemaTempla import org.apache.iotdb.db.mpp.plan.statement.metadata.template.SetSchemaTemplateStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTemplateStatement; import org.apache.iotdb.db.pipe.agent.PipeAgent; -import org.apache.iotdb.db.pipe.connector.legacy.IoTDBSyncReceiverV1_1; +import org.apache.iotdb.db.pipe.connector.legacy.IoTDBSyncReceiver; import org.apache.iotdb.db.query.control.SessionManager; import org.apache.iotdb.db.query.control.clientsession.IClientSession; import org.apache.iotdb.db.quotas.DataNodeThrottleQuotaManager; @@ -2082,7 +2082,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSStatus handshake(TSyncIdentityInfo info) throws TException { - return IoTDBSyncReceiverV1_1.getInstance() + return IoTDBSyncReceiver.getInstance() .handshake( info, SESSION_MANAGER.getCurrSession().getClientAddress(), @@ -2092,12 +2092,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSStatus sendPipeData(ByteBuffer buff) throws TException { - return IoTDBSyncReceiverV1_1.getInstance().transportPipeData(buff); + return IoTDBSyncReceiver.getInstance().transportPipeData(buff); } @Override public TSStatus sendFile(TSyncTransportMetaInfo metaInfo, ByteBuffer buff) throws TException { - return IoTDBSyncReceiverV1_1.getInstance().transportFile(metaInfo, buff); + return IoTDBSyncReceiver.getInstance().transportFile(metaInfo, buff); } @Override @@ -2242,7 +2242,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { TSCloseSessionReq req = new TSCloseSessionReq(); closeSession(req); } - IoTDBSyncReceiverV1_1.getInstance().handleClientExit(); + IoTDBSyncReceiver.getInstance().handleClientExit(); PipeAgent.receiver().handleClientExit(); } }
