This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a408f9ab780 Pipe: Fix closing a schema queue while the region is 
removing may cause thrift requirement error (#12858)
a408f9ab780 is described below

commit a408f9ab780eb07c493a86823c57765215973abd
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jul 4 20:35:34 2024 +0800

    Pipe: Fix closing a schema queue while the region is removing may cause 
thrift requirement error (#12858)
---
 .../confignode/persistence/pipe/PipeTaskInfo.java  | 35 ++++++++++----------
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 37 +++++++++++-----------
 2 files changed, 36 insertions(+), 36 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 5996545461d..f83a7eac35f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -692,26 +692,25 @@ public class PipeTaskInfo implements SnapshotProcessor {
           continue;
         }
 
-        resp.getExceptionMessages()
+        resp.getExceptionMessages().stream()
+            .filter(message -> 
pipeMetaKeeper.containsPipeMeta(message.getPipeName()))
             .forEach(
                 message -> {
-                  if (pipeMetaKeeper.containsPipeMeta(message.getPipeName())) {
-                    final PipeRuntimeMeta runtimeMeta =
-                        
pipeMetaKeeper.getPipeMeta(message.getPipeName()).getRuntimeMeta();
-
-                    // Mark the status of the pipe with exception as stopped
-                    runtimeMeta.getStatus().set(PipeStatus.STOPPED);
-                    runtimeMeta.setIsStoppedByRuntimeException(true);
-
-                    final Map<Integer, PipeRuntimeException> exceptionMap =
-                        runtimeMeta.getNodeId2PipeRuntimeExceptionMap();
-                    if (!exceptionMap.containsKey(dataNodeId)
-                        || exceptionMap.get(dataNodeId).getTimeStamp() < 
message.getTimeStamp()) {
-                      exceptionMap.put(
-                          dataNodeId,
-                          new PipeRuntimeCriticalException(
-                              message.getMessage(), message.getTimeStamp()));
-                    }
+                  final PipeRuntimeMeta runtimeMeta =
+                      
pipeMetaKeeper.getPipeMeta(message.getPipeName()).getRuntimeMeta();
+
+                  // Mark the status of the pipe with exception as stopped
+                  runtimeMeta.getStatus().set(PipeStatus.STOPPED);
+                  runtimeMeta.setIsStoppedByRuntimeException(true);
+
+                  final Map<Integer, PipeRuntimeException> exceptionMap =
+                      runtimeMeta.getNodeId2PipeRuntimeExceptionMap();
+                  if (!exceptionMap.containsKey(dataNodeId)
+                      || exceptionMap.get(dataNodeId).getTimeStamp() < 
message.getTimeStamp()) {
+                    exceptionMap.put(
+                        dataNodeId,
+                        new PipeRuntimeCriticalException(
+                            message.getMessage(), message.getTimeStamp()));
                   }
                 });
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 56d41da1d15..ab0e8cfafef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -162,8 +162,10 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       LOGGER.warn(
           "Failed to clear/close the schema region listening queue, because 
{}. Will wait until success or the region's state machine is stopped.",
           e.getMessage());
+      // Do not use null pipe name to retain the field "required" to be 
compatible with the lower
+      // versions
       exceptionMessages.add(
-          new TPushPipeMetaRespExceptionMessage(null, e.getMessage(), 
System.currentTimeMillis()));
+          new TPushPipeMetaRespExceptionMessage("", e.getMessage(), 
System.currentTimeMillis()));
     }
 
     return exceptionMessages;
@@ -222,25 +224,24 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       return;
     }
 
-    PipeDataNodeAgent.runtime()
-        .listeningSchemaRegionIds()
+    PipeDataNodeAgent.runtime().listeningSchemaRegionIds().stream()
+        .filter(
+            schemaRegionId ->
+                !validSchemaRegionIds.contains(schemaRegionId.getId())
+                    && 
PipeDataNodeAgent.runtime().isSchemaLeaderReady(schemaRegionId))
         .forEach(
             schemaRegionId -> {
-              if (!validSchemaRegionIds.contains(schemaRegionId.getId())
-                  && 
PipeDataNodeAgent.runtime().isSchemaLeaderReady(schemaRegionId)) {
-                try {
-                  SchemaRegionConsensusImpl.getInstance()
-                      .write(
-                          schemaRegionId,
-                          new PipeOperateSchemaQueueNode(new PlanNodeId(""), 
false));
-                } catch (final ConsensusException e) {
-                  throw new PipeException(
-                      "Failed to close listening queue for SchemaRegion "
-                          + schemaRegionId
-                          + ", because "
-                          + e.getMessage(),
-                      e);
-                }
+              try {
+                SchemaRegionConsensusImpl.getInstance()
+                    .write(
+                        schemaRegionId, new PipeOperateSchemaQueueNode(new 
PlanNodeId(""), false));
+              } catch (final ConsensusException e) {
+                throw new PipeException(
+                    "Failed to close listening queue for SchemaRegion "
+                        + schemaRegionId
+                        + ", because "
+                        + e.getMessage(),
+                    e);
               }
             });
   }

Reply via email to