This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d66d6445ed [IOTDB-4714] Rename TPipeInfo to TCreatePipeReq (#7692)
d66d6445ed is described below
commit d66d6445edbae6e5b747e82385c5951e908eb457
Author: Chen YZ <[email protected]>
AuthorDate: Sat Oct 22 14:12:25 2022 +0800
[IOTDB-4714] Rename TPipeInfo to TCreatePipeReq (#7692)
---
.../iotdb/confignode/manager/ConfigManager.java | 6 +++---
.../org/apache/iotdb/confignode/manager/IManager.java | 6 +++---
.../iotdb/confignode/manager/ProcedureManager.java | 4 ++--
.../procedure/impl/sync/CreatePipeProcedure.java | 6 +++---
.../service/thrift/ConfigNodeRPCServiceProcessor.java | 4 ++--
.../procedure/impl/OperatePipeProcedureTest.java | 8 ++++----
.../commons/utils/ThriftConfigNodeSerDeUtils.java | 19 -------------------
.../org/apache/iotdb/db/client/ConfigNodeClient.java | 4 ++--
.../config/executor/ClusterConfigTaskExecutor.java | 8 ++++----
.../org/apache/iotdb/db/utils/sync/SyncPipeUtil.java | 4 ++--
thrift-confignode/src/main/thrift/confignode.thrift | 4 ++--
11 files changed, 27 insertions(+), 46 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index ebcda2da27..73e2c79ca4 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -88,6 +88,7 @@ import
org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
@@ -106,7 +107,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -1058,10 +1058,10 @@ public class ConfigManager implements IManager {
}
@Override
- public TSStatus createPipe(TPipeInfo pipeInfo) {
+ public TSStatus createPipe(TCreatePipeReq req) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return procedureManager.createPipe(pipeInfo);
+ return procedureManager.createPipe(req);
} else {
return status;
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index b8cc55c7c1..8080014a44 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
@@ -67,7 +68,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -433,10 +433,10 @@ public interface IManager {
/**
* Create Pipe
*
- * @param pipeInfo Info about Pipe
+ * @param req Info about Pipe
* @return TSStatus
*/
- TSStatus createPipe(TPipeInfo pipeInfo);
+ TSStatus createPipe(TCreatePipeReq req);
/**
* Start Pipe
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 5abea8f104..bf84a46e89 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -56,8 +56,8 @@ import
org.apache.iotdb.confignode.procedure.store.IProcedureStore;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.confignode.procedure.store.ProcedureStore;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
-import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.rpc.RpcUtils;
@@ -272,7 +272,7 @@ public class ProcedureManager {
}
}
- public TSStatus createPipe(TPipeInfo req) {
+ public TSStatus createPipe(TCreatePipeReq req) {
try {
long procedureId = executor.submitProcedure(new
CreatePipeProcedure(req));
List<TSStatus> statusList = new ArrayList<>();
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java
index f8945009b3..fdaa1a2a13 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java
@@ -28,7 +28,7 @@ import
org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.state.sync.OperatePipeState;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
-import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -50,9 +50,9 @@ public class CreatePipeProcedure extends
AbstractOperatePipeProcedure {
super();
}
- public CreatePipeProcedure(TPipeInfo pipeInfo) throws PipeException {
+ public CreatePipeProcedure(TCreatePipeReq req) throws PipeException {
super();
- this.pipeInfo = SyncPipeUtil.parseTPipeInfoAsPipeInfo(pipeInfo,
System.currentTimeMillis());
+ this.pipeInfo = SyncPipeUtil.parseTCreatePipeReqAsPipeInfo(req,
System.currentTimeMillis());
}
@Override
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index edf57c6ea9..73c7357ae2 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -74,6 +74,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp;
@@ -106,7 +107,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
@@ -645,7 +645,7 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TSStatus createPipe(TPipeInfo req) {
+ public TSStatus createPipe(TCreatePipeReq req) {
return configManager.createPipe(req);
}
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/OperatePipeProcedureTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/OperatePipeProcedureTest.java
index 16190ea3eb..ef4e8af6c5 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/OperatePipeProcedureTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/OperatePipeProcedureTest.java
@@ -24,7 +24,7 @@ import
org.apache.iotdb.confignode.procedure.impl.sync.DropPipeProcedure;
import org.apache.iotdb.confignode.procedure.impl.sync.StartPipeProcedure;
import org.apache.iotdb.confignode.procedure.impl.sync.StopPipeProcedure;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
-import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.junit.Test;
@@ -45,14 +45,14 @@ public class OperatePipeProcedureTest {
DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
Map<String, String> attributes = new HashMap<>();
attributes.put("syncdelop", "false");
- TPipeInfo pipeInfo =
- new TPipeInfo()
+ TCreatePipeReq req =
+ new TCreatePipeReq()
.setPipeName("PipeName")
.setPipeSinkName("PipeSinkName")
.setStartTime(999)
.setAttributes(attributes);
- CreatePipeProcedure p1 = new CreatePipeProcedure(pipeInfo);
+ CreatePipeProcedure p1 = new CreatePipeProcedure(req);
try {
p1.serialize(outputStream);
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
index 36ba2d3415..1677caacc9 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.commons.utils;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
-import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
@@ -150,22 +149,4 @@ public class ThriftConfigNodeSerDeUtils {
}
return pipeSinkInfo;
}
-
- public static void serializeTPipeInfo(TPipeInfo pipeInfo, DataOutputStream
stream) {
- try {
- pipeInfo.write(generateWriteProtocol(stream));
- } catch (TException e) {
- throw new ThriftSerDeException("Write TPipeInfo failed: ", e);
- }
- }
-
- public static TPipeInfo deserializeTPipeInfo(ByteBuffer buffer) {
- TPipeInfo pipeInfo = new TPipeInfo();
- try {
- pipeInfo.read(generateReadProtocol(buffer));
- } catch (TException e) {
- throw new ThriftSerDeException("Read TPipeInfo failed: ", e);
- }
- return pipeInfo;
- }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index df0e0b65e8..5baf7350d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -41,6 +41,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp;
@@ -73,7 +74,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
@@ -1130,7 +1130,7 @@ public class ConfigNodeClient
}
@Override
- public TSStatus createPipe(TPipeInfo req) throws TException {
+ public TSStatus createPipe(TCreatePipeReq req) throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
try {
TSStatus status = client.createPipe(req);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 4857dbcd40..fff5a5545b 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
@@ -49,7 +50,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
-import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
@@ -889,13 +889,13 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
- TPipeInfo pipeInfo =
- new TPipeInfo()
+ TCreatePipeReq req =
+ new TCreatePipeReq()
.setPipeName(createPipeStatement.getPipeName())
.setPipeSinkName(createPipeStatement.getPipeSinkName())
.setStartTime(createPipeStatement.getStartTime())
.setAttributes(createPipeStatement.getPipeAttributes());
- TSStatus tsStatus = configNodeClient.createPipe(pipeInfo);
+ TSStatus tsStatus = configNodeClient.createPipe(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.error(
"Failed to create PIPE {} in config node, status is {}.",
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
b/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
index b8ce74947e..9dfffffc79 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo;
import org.apache.iotdb.commons.sync.pipesink.IoTDBPipeSink;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
-import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
@@ -137,7 +137,7 @@ public class SyncPipeUtil {
}
/** parse TPipeInfo to PipeInfo */
- public static PipeInfo parseTPipeInfoAsPipeInfo(TPipeInfo pipeInfo, long
pipeCreateTime)
+ public static PipeInfo parseTCreatePipeReqAsPipeInfo(TCreatePipeReq
pipeInfo, long pipeCreateTime)
throws PipeException {
boolean syncDelOp = true;
for (Map.Entry<String, String> entry :
pipeInfo.getAttributes().entrySet()) {
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift
b/thrift-confignode/src/main/thrift/confignode.thrift
index 4af568e3e0..cb7d3d1d08 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -486,7 +486,7 @@ struct TShowPipeInfo {
6: required string message
}
-struct TPipeInfo {
+struct TCreatePipeReq {
1: required string pipeName
2: required string pipeSinkName
3: required i64 startTime
@@ -893,7 +893,7 @@ service IConfigNodeRPCService {
TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req)
/** Create Pipe */
- common.TSStatus createPipe(TPipeInfo req)
+ common.TSStatus createPipe(TCreatePipeReq req)
/** Start Pipe */
common.TSStatus startPipe(string pipeName)