This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-parallel-connector
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit dc127246e640e3cfc1f4aabcf5cc4a30becf5ce7
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Jun 8 12:36:30 2023 +0800

    prepare for async client mode dev iteration: v2
---
 .../impl/pipe/runtime/PipeHandleMetaChangeProcedure.java          | 2 +-
 .../apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java  | 2 +-
 .../apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java    | 8 ++++----
 .../iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java      | 3 ++-
 .../apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java  | 4 ++--
 .../db/pipe/connector/v1/request/PipeTransferFilePieceReq.java    | 2 +-
 .../db/pipe/connector/v1/request/PipeTransferFileSealReq.java     | 2 +-
 .../db/pipe/connector/v1/request/PipeTransferHandshakeReq.java    | 2 +-
 .../db/pipe/connector/v1/request/PipeTransferInsertNodeReq.java   | 2 +-
 .../iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java | 2 +-
 .../apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java | 2 +-
 11 files changed, 16 insertions(+), 15 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 31ebb405907..435b3e0b41f 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -106,7 +106,7 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
       final PipeMeta pipeMetaFromDataNode =
           pipeMetaMapFromDataNode.get(pipeMetaOnConfigNode.getStaticMeta());
       if (pipeMetaFromDataNode == null) {
-        LOGGER.warn(
+        LOGGER.info(
             "PipeRuntimeCoordinator meets error in updating pipeMetaKeeper, "
                 + "pipeMetaFromDataNode is null, pipeMetaOnConfigNode: {}",
             pipeMetaOnConfigNode);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
index 4230ee73a34..91c2b980da3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
@@ -29,7 +29,7 @@ public interface IoTDBThriftReceiver {
 
   IoTDBThriftConnectorVersion getVersion();
 
-  TPipeTransferResp handleTransferReq(
+  TPipeTransferResp receive(
       TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher 
schemaFetcher);
 
   void handleExit();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
index fdeda91ea56..97489d7e433 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
@@ -37,11 +37,11 @@ public class PipeReceiverAgent {
 
   private final ThreadLocal<IoTDBThriftReceiver> receiverThreadLocal = new 
ThreadLocal<>();
 
-  public TPipeTransferResp transfer(
+  public TPipeTransferResp receive(
       TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher 
schemaFetcher) {
     final byte reqVersion = req.getVersion();
-    if (reqVersion == IoTDBThriftConnectorVersion.VERSION_ONE.getVersion()) {
-      return getReceiver(reqVersion).handleTransferReq(req, partitionFetcher, 
schemaFetcher);
+    if (reqVersion == IoTDBThriftConnectorVersion.VERSION_1.getVersion()) {
+      return getReceiver(reqVersion).receive(req, partitionFetcher, 
schemaFetcher);
     } else {
       return new TPipeTransferResp(
           RpcUtils.getStatus(
@@ -71,7 +71,7 @@ public class PipeReceiverAgent {
   }
 
   private IoTDBThriftReceiver setAndGetReceiver(byte reqVersion) {
-    if (reqVersion == IoTDBThriftConnectorVersion.VERSION_ONE.getVersion()) {
+    if (reqVersion == IoTDBThriftConnectorVersion.VERSION_1.getVersion()) {
       receiverThreadLocal.set(new IoTDBThriftReceiverV1());
     } else {
       throw new UnsupportedOperationException(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java
index 7dac858ae3f..8b7328a0425 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorVersion.java
@@ -20,7 +20,8 @@
 package org.apache.iotdb.db.pipe.connector;
 
 public enum IoTDBThriftConnectorVersion {
-  VERSION_ONE((byte) 1),
+  VERSION_1((byte) 1),
+  VERSION_2((byte) 1),
   ;
 
   private final byte version;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java
index dfd6765b68d..9c4d689d20b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java
@@ -61,7 +61,7 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
   private RandomAccessFile writingFileWriter;
 
   @Override
-  public synchronized TPipeTransferResp handleTransferReq(
+  public synchronized TPipeTransferResp receive(
       TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher 
schemaFetcher) {
     final short rawRequestType = req.getType();
     if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
@@ -298,6 +298,6 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
 
   @Override
   public IoTDBThriftConnectorVersion getVersion() {
-    return IoTDBThriftConnectorVersion.VERSION_ONE;
+    return IoTDBThriftConnectorVersion.VERSION_1;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFilePieceReq.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFilePieceReq.java
index 897ee2a29e7..a40e6195fdd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFilePieceReq.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFilePieceReq.java
@@ -58,7 +58,7 @@ public class PipeTransferFilePieceReq extends 
TPipeTransferReq {
     filePieceReq.startWritingOffset = startWritingOffset;
     filePieceReq.filePiece = filePiece;
 
-    filePieceReq.version = 
IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+    filePieceReq.version = IoTDBThriftConnectorVersion.VERSION_1.getVersion();
     filePieceReq.type = PipeRequestType.TRANSFER_FILE_PIECE.getType();
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFileSealReq.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFileSealReq.java
index 0e73b95af07..8f6e7186988 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFileSealReq.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferFileSealReq.java
@@ -51,7 +51,7 @@ public class PipeTransferFileSealReq extends TPipeTransferReq 
{
     fileSealReq.fileName = fileName;
     fileSealReq.fileLength = fileLength;
 
-    fileSealReq.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+    fileSealReq.version = IoTDBThriftConnectorVersion.VERSION_1.getVersion();
     fileSealReq.type = PipeRequestType.TRANSFER_FILE_SEAL.getType();
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferHandshakeReq.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferHandshakeReq.java
index da03903fd79..b8b9b9f38ad 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferHandshakeReq.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferHandshakeReq.java
@@ -45,7 +45,7 @@ public class PipeTransferHandshakeReq extends 
TPipeTransferReq {
 
     handshakeReq.timestampPrecision = timestampPrecision;
 
-    handshakeReq.version = 
IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+    handshakeReq.version = IoTDBThriftConnectorVersion.VERSION_1.getVersion();
     handshakeReq.type = PipeRequestType.HANDSHAKE.getType();
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferInsertNodeReq.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferInsertNodeReq.java
index 10098a9721e..f852cf936d4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferInsertNodeReq.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferInsertNodeReq.java
@@ -83,7 +83,7 @@ public class PipeTransferInsertNodeReq extends 
TPipeTransferReq {
 
     req.insertNode = insertNode;
 
-    req.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+    req.version = IoTDBThriftConnectorVersion.VERSION_1.getVersion();
     req.type = PipeRequestType.TRANSFER_INSERT_NODE.getType();
     req.body = insertNode.serializeToByteBuffer();
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java
index 758013d6c72..558ed43620b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/request/PipeTransferTabletReq.java
@@ -52,7 +52,7 @@ public class PipeTransferTabletReq extends TPipeTransferReq {
 
     tabletReq.tablet = tablet;
 
-    tabletReq.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+    tabletReq.version = IoTDBThriftConnectorVersion.VERSION_1.getVersion();
     tabletReq.type = PipeRequestType.TRANSFER_TABLET.getType();
     tabletReq.body = tablet.serialize();
     return tabletReq;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 28bec0979bd..4fbe17270f6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -2111,7 +2111,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
   @Override
   public TPipeTransferResp pipeTransfer(TPipeTransferReq req) {
-    return PipeAgent.receiver().transfer(req, partitionFetcher, schemaFetcher);
+    return PipeAgent.receiver().receive(req, partitionFetcher, schemaFetcher);
   }
 
   @Override

Reply via email to