This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new 2dd2f0f76cd [IOTDB-5976] Pipe: optimized the feedback when pipe
procedure failed to pushPipeMetas (#10084)
2dd2f0f76cd is described below
commit 2dd2f0f76cdfdc12ef2ac0d10976101860840c33
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 9 00:34:34 2023 +0800
[IOTDB-5976] Pipe: optimized the feedback when pipe procedure failed to
pushPipeMetas (#10084)
---
.../impl/pipe/AbstractOperatePipeProcedureV2.java | 9 +++------
.../impl/pipe/task/CreatePipeProcedureV2.java | 18 ++++++++++++++++--
.../procedure/impl/pipe/task/DropPipeProcedureV2.java | 9 ++++++++-
.../procedure/impl/pipe/task/StartPipeProcedureV2.java | 18 ++++++++++++++++--
.../procedure/impl/pipe/task/StopPipeProcedureV2.java | 17 +++++++++++++++--
.../thrift/impl/DataNodeInternalRPCServiceImpl.java | 9 +++++++--
6 files changed, 65 insertions(+), 15 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 200a71e8f6f..e48901b7824 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.confignode.procedure.impl.pipe;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
@@ -27,7 +28,6 @@ import
org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
import
org.apache.iotdb.confignode.procedure.state.pipe.task.OperatePipeTaskState;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
@@ -181,7 +181,7 @@ public abstract class AbstractOperatePipeProcedureV2
return OperatePipeTaskState.VALIDATE_TASK;
}
- protected void pushPipeMetaToDataNodes(ConfigNodeProcedureEnv env) throws
IOException {
+ protected TSStatus pushPipeMetaToDataNodes(ConfigNodeProcedureEnv env)
throws IOException {
final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
for (PipeMeta pipeMeta :
env.getConfigManager()
@@ -192,10 +192,7 @@ public abstract class AbstractOperatePipeProcedureV2
pipeMetaBinaryList.add(pipeMeta.serialize());
}
- if
(RpcUtils.squashResponseStatusList(env.pushPipeMetaToDataNodes(pipeMetaBinaryList)).getCode()
- != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeException("Failed to push pipe meta list to data nodes");
- }
+ return
RpcUtils.squashResponseStatusList(env.pushPipeMetaToDataNodes(pipeMetaBinaryList));
}
protected void pushPipeMetaToDataNodesIgnoreException(ConfigNodeProcedureEnv
env) {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 297b1c97a79..0ed89dfe536 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.procedure.impl.pipe.task;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
@@ -34,6 +35,7 @@ import
org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
@@ -130,7 +132,13 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
"CreatePipeProcedureV2: executeFromOperateOnDataNodes({})",
createPipeRequest.getPipeName());
- pushPipeMetaToDataNodes(env);
+ TSStatus result = pushPipeMetaToDataNodes(env);
+ if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(
+ String.format(
+ "Failed to create pipe %s on data nodes. Failures: %s",
+ pipeStaticMeta.getPipeName(), result.getMessage()));
+ }
}
@Override
@@ -169,7 +177,13 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
"CreatePipeProcedureV2: rollbackFromOperateOnDataNodes({})",
createPipeRequest.getPipeName());
- pushPipeMetaToDataNodes(env);
+ TSStatus result = pushPipeMetaToDataNodes(env);
+ if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(
+ String.format(
+ "Failed to rollback create pipe %s on data nodes. Failures: %s",
+ pipeStaticMeta.getPipeName(), result.getMessage()));
+ }
}
@Override
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
index bae85b9bf74..df89bba9f30 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.confignode.procedure.impl.pipe.task;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
@@ -25,6 +26,7 @@ import
org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
@@ -88,7 +90,12 @@ public class DropPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
throws PipeException, IOException {
LOGGER.info("DropPipeProcedureV2: executeFromOperateOnDataNodes({})",
pipeName);
- pushPipeMetaToDataNodes(env);
+ TSStatus result = pushPipeMetaToDataNodes(env);
+ if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(
+ String.format(
+ "Failed to drop pipe %s on data nodes. Failures: %s", pipeName,
result.getMessage()));
+ }
}
@Override
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
index ef277a4fcc8..a7e7ba7d88e 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.confignode.procedure.impl.pipe.task;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -26,6 +27,7 @@ import
org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
@@ -91,7 +93,13 @@ public class StartPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
throws PipeException, IOException {
LOGGER.info("StartPipeProcedureV2: executeFromOperateOnDataNodes({})",
pipeName);
- pushPipeMetaToDataNodes(env);
+ TSStatus result = pushPipeMetaToDataNodes(env);
+ if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(
+ String.format(
+ "Failed to start pipe %s on data nodes. Failures: %s",
+ pipeName, result.getMessage()));
+ }
}
@Override
@@ -124,7 +132,13 @@ public class StartPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
throws PipeException, IOException {
LOGGER.info("StartPipeProcedureV2: rollbackFromOperateOnDataNodes({})",
pipeName);
- pushPipeMetaToDataNodes(env);
+ TSStatus result = pushPipeMetaToDataNodes(env);
+ if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(
+ String.format(
+ "Failed to rollback start pipe %s on data nodes. Failures: %s",
+ pipeName, result.getMessage()));
+ }
}
@Override
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index 7c98678a608..501640ebc72 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.confignode.procedure.impl.pipe.task;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -26,6 +27,7 @@ import
org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
@@ -91,7 +93,12 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
throws PipeException, IOException {
LOGGER.info("StopPipeProcedureV2: executeFromOperateOnDataNodes({})",
pipeName);
- pushPipeMetaToDataNodes(env);
+ TSStatus result = pushPipeMetaToDataNodes(env);
+ if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(
+ String.format(
+ "Failed to stop pipe %s on data nodes. Failures: %s", pipeName,
result.getMessage()));
+ }
}
@Override
@@ -124,7 +131,13 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
throws PipeException, IOException {
LOGGER.info("StopPipeProcedureV2: rollbackFromOperateOnDataNodes({})",
pipeName);
- pushPipeMetaToDataNodes(env);
+ TSStatus result = pushPipeMetaToDataNodes(env);
+ if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(
+ String.format(
+ "Failed to rollback stop pipe %s on data nodes. Failures: %s",
+ pipeName, result.getMessage()));
+ }
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 0863b58d1f5..ba86131b29c 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -899,8 +899,13 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
for (ByteBuffer byteBuffer : req.getPipeMetas()) {
pipeMetas.add(PipeMeta.deserialize(byteBuffer));
}
- PipeAgent.task().handlePipeMetaChanges(pipeMetas);
- return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ try {
+ PipeAgent.task().handlePipeMetaChanges(pipeMetas);
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } catch (Exception e) {
+ LOGGER.error("Error occurred when pushing pipe meta", e);
+ return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
e.getMessage());
+ }
}
private TSStatus executeInternalSchemaTask(