This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-parallel-connector in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit dc127246e640e3cfc1f4aabcf5cc4a30becf5ce7 Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Jun 8 12:36:30 2023 +0800 prepare for async client mode dev iteration: v2 --- .../impl/pipe/runtime/PipeHandleMetaChangeProcedure.java | 2 +- .../apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java | 2 +- .../apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java | 8 ++++---- .../iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java | 3 ++- .../apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java | 4 ++-- .../db/pipe/connector/v1/request/PipeTransferFilePieceReq.java | 2 +- .../db/pipe/connector/v1/request/PipeTransferFileSealReq.java | 2 +- .../db/pipe/connector/v1/request/PipeTransferHandshakeReq.java | 2 +- .../db/pipe/connector/v1/request/PipeTransferInsertNodeReq.java | 2 +- .../iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java | 2 +- .../apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java | 2 +- 11 files changed, 16 insertions(+), 15 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java index 31ebb405907..435b3e0b41f 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java @@ -106,7 +106,7 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV final PipeMeta pipeMetaFromDataNode = pipeMetaMapFromDataNode.get(pipeMetaOnConfigNode.getStaticMeta()); if (pipeMetaFromDataNode == null) { - LOGGER.warn( + LOGGER.info( "PipeRuntimeCoordinator meets error in updating pipeMetaKeeper, " + "pipeMetaFromDataNode is null, pipeMetaOnConfigNode: {}", pipeMetaOnConfigNode); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java index 4230ee73a34..91c2b980da3 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java @@ -29,7 +29,7 @@ public interface IoTDBThriftReceiver { IoTDBThriftConnectorVersion getVersion(); - TPipeTransferResp handleTransferReq( + TPipeTransferResp receive( TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher); void handleExit(); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java index fdeda91ea56..97489d7e433 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java @@ -37,11 +37,11 @@ public class PipeReceiverAgent { private final ThreadLocal<IoTDBThriftReceiver> receiverThreadLocal = new ThreadLocal<>(); - public TPipeTransferResp transfer( + public TPipeTransferResp receive( TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) { final byte reqVersion = req.getVersion(); - if (reqVersion == IoTDBThriftConnectorVersion.VERSION_ONE.getVersion()) { - return getReceiver(reqVersion).handleTransferReq(req, partitionFetcher, schemaFetcher); + if (reqVersion == IoTDBThriftConnectorVersion.VERSION_1.getVersion()) { + return getReceiver(reqVersion).receive(req, partitionFetcher, schemaFetcher); } else { return new TPipeTransferResp( RpcUtils.getStatus( @@ -71,7 +71,7 @@ public class PipeReceiverAgent { } private IoTDBThriftReceiver setAndGetReceiver(byte reqVersion) { - if (reqVersion == IoTDBThriftConnectorVersion.VERSION_ONE.getVersion()) { + if (reqVersion == IoTDBThriftConnectorVersion.VERSION_1.getVersion()) { receiverThreadLocal.set(new IoTDBThriftReceiverV1()); } else { throw new UnsupportedOperationException( diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java index 7dac858ae3f..8b7328a0425 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java @@ -20,7 +20,8 @@ package org.apache.iotdb.db.pipe.connector; public enum IoTDBThriftConnectorVersion { - VERSION_ONE((byte) 1), + VERSION_1((byte) 1), + VERSION_2((byte) 1), ; private final byte version; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java index dfd6765b68d..9c4d689d20b 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java @@ -61,7 +61,7 @@ public class IoTDBThriftReceiverV1 implements IoTDBThriftReceiver { private RandomAccessFile writingFileWriter; @Override - public synchronized TPipeTransferResp handleTransferReq( + public synchronized TPipeTransferResp receive( TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) { final short rawRequestType = req.getType(); if (PipeRequestType.isValidatedRequestType(rawRequestType)) { @@ -298,6 +298,6 @@ public class IoTDBThriftReceiverV1 implements IoTDBThriftReceiver { @Override public IoTDBThriftConnectorVersion getVersion() { - return IoTDBThriftConnectorVersion.VERSION_ONE; + return IoTDBThriftConnectorVersion.VERSION_1; } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFilePieceReq.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFilePieceReq.java index 897ee2a29e7..a40e6195fdd 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFilePieceReq.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFilePieceReq.java @@ -58,7 +58,7 @@ public class PipeTransferFilePieceReq extends TPipeTransferReq { filePieceReq.startWritingOffset = startWritingOffset; filePieceReq.filePiece = filePiece; - filePieceReq.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion(); + filePieceReq.version = IoTDBThriftConnectorVersion.VERSION_1.getVersion(); filePieceReq.type = PipeRequestType.TRANSFER_FILE_PIECE.getType(); try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFileSealReq.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFileSealReq.java index 0e73b95af07..8f6e7186988 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFileSealReq.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFileSealReq.java @@ -51,7 +51,7 @@ public class PipeTransferFileSealReq extends TPipeTransferReq { fileSealReq.fileName = fileName; fileSealReq.fileLength = fileLength; - fileSealReq.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion(); + fileSealReq.version = IoTDBThriftConnectorVersion.VERSION_1.getVersion(); fileSealReq.type = PipeRequestType.TRANSFER_FILE_SEAL.getType(); try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferHandshakeReq.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferHandshakeReq.java index da03903fd79..b8b9b9f38ad 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferHandshakeReq.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferHandshakeReq.java @@ -45,7 +45,7 @@ public class PipeTransferHandshakeReq extends TPipeTransferReq { handshakeReq.timestampPrecision = timestampPrecision; - handshakeReq.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion(); + handshakeReq.version = IoTDBThriftConnectorVersion.VERSION_1.getVersion(); handshakeReq.type = PipeRequestType.HANDSHAKE.getType(); try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferInsertNodeReq.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferInsertNodeReq.java index 10098a9721e..f852cf936d4 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferInsertNodeReq.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferInsertNodeReq.java @@ -83,7 +83,7 @@ public class PipeTransferInsertNodeReq extends TPipeTransferReq { req.insertNode = insertNode; - req.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion(); + req.version = IoTDBThriftConnectorVersion.VERSION_1.getVersion(); req.type = PipeRequestType.TRANSFER_INSERT_NODE.getType(); req.body = insertNode.serializeToByteBuffer(); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java index 758013d6c72..558ed43620b 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java @@ -52,7 +52,7 @@ public class PipeTransferTabletReq extends TPipeTransferReq { tabletReq.tablet = tablet; - tabletReq.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion(); + tabletReq.version = IoTDBThriftConnectorVersion.VERSION_1.getVersion(); tabletReq.type = PipeRequestType.TRANSFER_TABLET.getType(); tabletReq.body = tablet.serialize(); return tabletReq; diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java index 28bec0979bd..4fbe17270f6 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java @@ -2111,7 +2111,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TPipeTransferResp pipeTransfer(TPipeTransferReq req) { - return PipeAgent.receiver().transfer(req, partitionFetcher, schemaFetcher); + return PipeAgent.receiver().receive(req, partitionFetcher, schemaFetcher); } @Override
