This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a408f9ab780 Pipe: Fix closing a schema queue while the region is
removing may cause thrift requirement error (#12858)
a408f9ab780 is described below
commit a408f9ab780eb07c493a86823c57765215973abd
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jul 4 20:35:34 2024 +0800
Pipe: Fix closing a schema queue while the region is removing may cause
thrift requirement error (#12858)
---
.../confignode/persistence/pipe/PipeTaskInfo.java | 35 ++++++++++----------
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 37 +++++++++++-----------
2 files changed, 36 insertions(+), 36 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 5996545461d..f83a7eac35f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -692,26 +692,25 @@ public class PipeTaskInfo implements SnapshotProcessor {
continue;
}
- resp.getExceptionMessages()
+ resp.getExceptionMessages().stream()
+ .filter(message ->
pipeMetaKeeper.containsPipeMeta(message.getPipeName()))
.forEach(
message -> {
- if (pipeMetaKeeper.containsPipeMeta(message.getPipeName())) {
- final PipeRuntimeMeta runtimeMeta =
-
pipeMetaKeeper.getPipeMeta(message.getPipeName()).getRuntimeMeta();
-
- // Mark the status of the pipe with exception as stopped
- runtimeMeta.getStatus().set(PipeStatus.STOPPED);
- runtimeMeta.setIsStoppedByRuntimeException(true);
-
- final Map<Integer, PipeRuntimeException> exceptionMap =
- runtimeMeta.getNodeId2PipeRuntimeExceptionMap();
- if (!exceptionMap.containsKey(dataNodeId)
- || exceptionMap.get(dataNodeId).getTimeStamp() <
message.getTimeStamp()) {
- exceptionMap.put(
- dataNodeId,
- new PipeRuntimeCriticalException(
- message.getMessage(), message.getTimeStamp()));
- }
+ final PipeRuntimeMeta runtimeMeta =
+
pipeMetaKeeper.getPipeMeta(message.getPipeName()).getRuntimeMeta();
+
+ // Mark the status of the pipe with exception as stopped
+ runtimeMeta.getStatus().set(PipeStatus.STOPPED);
+ runtimeMeta.setIsStoppedByRuntimeException(true);
+
+ final Map<Integer, PipeRuntimeException> exceptionMap =
+ runtimeMeta.getNodeId2PipeRuntimeExceptionMap();
+ if (!exceptionMap.containsKey(dataNodeId)
+ || exceptionMap.get(dataNodeId).getTimeStamp() <
message.getTimeStamp()) {
+ exceptionMap.put(
+ dataNodeId,
+ new PipeRuntimeCriticalException(
+ message.getMessage(), message.getTimeStamp()));
}
});
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 56d41da1d15..ab0e8cfafef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -162,8 +162,10 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
LOGGER.warn(
"Failed to clear/close the schema region listening queue, because
{}. Will wait until success or the region's state machine is stopped.",
e.getMessage());
+ // Do not use null pipe name to retain the field "required" to be
compatible with the lower
+ // versions
exceptionMessages.add(
- new TPushPipeMetaRespExceptionMessage(null, e.getMessage(),
System.currentTimeMillis()));
+ new TPushPipeMetaRespExceptionMessage("", e.getMessage(),
System.currentTimeMillis()));
}
return exceptionMessages;
@@ -222,25 +224,24 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
return;
}
- PipeDataNodeAgent.runtime()
- .listeningSchemaRegionIds()
+ PipeDataNodeAgent.runtime().listeningSchemaRegionIds().stream()
+ .filter(
+ schemaRegionId ->
+ !validSchemaRegionIds.contains(schemaRegionId.getId())
+ &&
PipeDataNodeAgent.runtime().isSchemaLeaderReady(schemaRegionId))
.forEach(
schemaRegionId -> {
- if (!validSchemaRegionIds.contains(schemaRegionId.getId())
- &&
PipeDataNodeAgent.runtime().isSchemaLeaderReady(schemaRegionId)) {
- try {
- SchemaRegionConsensusImpl.getInstance()
- .write(
- schemaRegionId,
- new PipeOperateSchemaQueueNode(new PlanNodeId(""),
false));
- } catch (final ConsensusException e) {
- throw new PipeException(
- "Failed to close listening queue for SchemaRegion "
- + schemaRegionId
- + ", because "
- + e.getMessage(),
- e);
- }
+ try {
+ SchemaRegionConsensusImpl.getInstance()
+ .write(
+ schemaRegionId, new PipeOperateSchemaQueueNode(new
PlanNodeId(""), false));
+ } catch (final ConsensusException e) {
+ throw new PipeException(
+ "Failed to close listening queue for SchemaRegion "
+ + schemaRegionId
+ + ", because "
+ + e.getMessage(),
+ e);
}
});
}