This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new 2dd2f0f76cd [IOTDB-5976] Pipe: optimized the feedback when pipe 
procedure failed to pushPipeMetas (#10084)
2dd2f0f76cd is described below

commit 2dd2f0f76cdfdc12ef2ac0d10976101860840c33
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 9 00:34:34 2023 +0800

    [IOTDB-5976] Pipe: optimized the feedback when pipe procedure failed to 
pushPipeMetas (#10084)
---
 .../impl/pipe/AbstractOperatePipeProcedureV2.java      |  9 +++------
 .../impl/pipe/task/CreatePipeProcedureV2.java          | 18 ++++++++++++++++--
 .../procedure/impl/pipe/task/DropPipeProcedureV2.java  |  9 ++++++++-
 .../procedure/impl/pipe/task/StartPipeProcedureV2.java | 18 ++++++++++++++++--
 .../procedure/impl/pipe/task/StopPipeProcedureV2.java  | 17 +++++++++++++++--
 .../thrift/impl/DataNodeInternalRPCServiceImpl.java    |  9 +++++++--
 6 files changed, 65 insertions(+), 15 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 200a71e8f6f..e48901b7824 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.confignode.procedure.impl.pipe;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
@@ -27,7 +28,6 @@ import 
org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
 import 
org.apache.iotdb.confignode.procedure.state.pipe.task.OperatePipeTaskState;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.slf4j.Logger;
@@ -181,7 +181,7 @@ public abstract class AbstractOperatePipeProcedureV2
     return OperatePipeTaskState.VALIDATE_TASK;
   }
 
-  protected void pushPipeMetaToDataNodes(ConfigNodeProcedureEnv env) throws 
IOException {
+  protected TSStatus pushPipeMetaToDataNodes(ConfigNodeProcedureEnv env) 
throws IOException {
     final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
     for (PipeMeta pipeMeta :
         env.getConfigManager()
@@ -192,10 +192,7 @@ public abstract class AbstractOperatePipeProcedureV2
       pipeMetaBinaryList.add(pipeMeta.serialize());
     }
 
-    if 
(RpcUtils.squashResponseStatusList(env.pushPipeMetaToDataNodes(pipeMetaBinaryList)).getCode()
-        != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new PipeException("Failed to push pipe meta list to data nodes");
-    }
+    return 
RpcUtils.squashResponseStatusList(env.pushPipeMetaToDataNodes(pipeMetaBinaryList));
   }
 
   protected void pushPipeMetaToDataNodesIgnoreException(ConfigNodeProcedureEnv 
env) {
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 297b1c97a79..0ed89dfe536 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
@@ -34,6 +35,7 @@ import 
org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.slf4j.Logger;
@@ -130,7 +132,13 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
         "CreatePipeProcedureV2: executeFromOperateOnDataNodes({})",
         createPipeRequest.getPipeName());
 
-    pushPipeMetaToDataNodes(env);
+    TSStatus result = pushPipeMetaToDataNodes(env);
+    if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(
+          String.format(
+              "Failed to create pipe %s on data nodes. Failures: %s",
+              pipeStaticMeta.getPipeName(), result.getMessage()));
+    }
   }
 
   @Override
@@ -169,7 +177,13 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
         "CreatePipeProcedureV2: rollbackFromOperateOnDataNodes({})",
         createPipeRequest.getPipeName());
 
-    pushPipeMetaToDataNodes(env);
+    TSStatus result = pushPipeMetaToDataNodes(env);
+    if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(
+          String.format(
+              "Failed to rollback create pipe %s on data nodes. Failures: %s",
+              pipeStaticMeta.getPipeName(), result.getMessage()));
+    }
   }
 
   @Override
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
index bae85b9bf74..df89bba9f30 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import 
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
@@ -25,6 +26,7 @@ import 
org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.slf4j.Logger;
@@ -88,7 +90,12 @@ public class DropPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
       throws PipeException, IOException {
     LOGGER.info("DropPipeProcedureV2: executeFromOperateOnDataNodes({})", 
pipeName);
 
-    pushPipeMetaToDataNodes(env);
+    TSStatus result = pushPipeMetaToDataNodes(env);
+    if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(
+          String.format(
+              "Failed to drop pipe %s on data nodes. Failures: %s", pipeName, 
result.getMessage()));
+    }
   }
 
   @Override
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
index ef277a4fcc8..a7e7ba7d88e 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -26,6 +27,7 @@ import 
org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.slf4j.Logger;
@@ -91,7 +93,13 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
       throws PipeException, IOException {
     LOGGER.info("StartPipeProcedureV2: executeFromOperateOnDataNodes({})", 
pipeName);
 
-    pushPipeMetaToDataNodes(env);
+    TSStatus result = pushPipeMetaToDataNodes(env);
+    if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(
+          String.format(
+              "Failed to start pipe %s on data nodes. Failures: %s",
+              pipeName, result.getMessage()));
+    }
   }
 
   @Override
@@ -124,7 +132,13 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
       throws PipeException, IOException {
     LOGGER.info("StartPipeProcedureV2: rollbackFromOperateOnDataNodes({})", 
pipeName);
 
-    pushPipeMetaToDataNodes(env);
+    TSStatus result = pushPipeMetaToDataNodes(env);
+    if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(
+          String.format(
+              "Failed to rollback start pipe %s on data nodes. Failures: %s",
+              pipeName, result.getMessage()));
+    }
   }
 
   @Override
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index 7c98678a608..501640ebc72 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -26,6 +27,7 @@ import 
org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.slf4j.Logger;
@@ -91,7 +93,12 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
       throws PipeException, IOException {
     LOGGER.info("StopPipeProcedureV2: executeFromOperateOnDataNodes({})", 
pipeName);
 
-    pushPipeMetaToDataNodes(env);
+    TSStatus result = pushPipeMetaToDataNodes(env);
+    if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(
+          String.format(
+              "Failed to stop pipe %s on data nodes. Failures: %s", pipeName, 
result.getMessage()));
+    }
   }
 
   @Override
@@ -124,7 +131,13 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
       throws PipeException, IOException {
     LOGGER.info("StopPipeProcedureV2: rollbackFromOperateOnDataNodes({})", 
pipeName);
 
-    pushPipeMetaToDataNodes(env);
+    TSStatus result = pushPipeMetaToDataNodes(env);
+    if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(
+          String.format(
+              "Failed to rollback stop pipe %s on data nodes. Failures: %s",
+              pipeName, result.getMessage()));
+    }
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 0863b58d1f5..ba86131b29c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -899,8 +899,13 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     for (ByteBuffer byteBuffer : req.getPipeMetas()) {
       pipeMetas.add(PipeMeta.deserialize(byteBuffer));
     }
-    PipeAgent.task().handlePipeMetaChanges(pipeMetas);
-    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    try {
+      PipeAgent.task().handlePipeMetaChanges(pipeMetas);
+      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    } catch (Exception e) {
+      LOGGER.error("Error occurred when pushing pipe meta", e);
+      return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
e.getMessage());
+    }
   }
 
   private TSStatus executeInternalSchemaTask(

Reply via email to