This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4ca47c7c539 Pipe: Fixed the bug that CN cannot sense drop pipe failure
in meta sync and may lead to constantly skip of drop pipe (#12059)
4ca47c7c539 is described below
commit 4ca47c7c53904747d2dd011494f7daf00b198511
Author: Caideyipi <[email protected]>
AuthorDate: Wed Feb 21 18:31:53 2024 +0800
Pipe: Fixed the bug that CN cannot sense drop pipe failure in meta sync and
may lead to constantly skip of drop pipe (#12059)
---
integration-test/pom.xml | 14 +++++++-------
.../java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java | 7 ++++---
.../iotdb/confignode/persistence/pipe/PipeTaskInfo.java | 4 +---
.../iotdb/commons/pipe/agent/task/PipeTaskAgent.java | 8 +++++++-
4 files changed, 19 insertions(+), 14 deletions(-)
diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index 678347f8039..420ccfeeb44 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -29,13 +29,6 @@
<artifactId>integration-test</artifactId>
<name>IoTDB: Integration-Test</name>
<properties>
-
<highPerformanceMode.configNodeConsensus>Ratis</highPerformanceMode.configNodeConsensus>
-
<highPerformanceMode.configNodeNumber>1</highPerformanceMode.configNodeNumber>
-
<highPerformanceMode.dataNodeNumber>3</highPerformanceMode.dataNodeNumber>
-
<highPerformanceMode.dataRegionConsensus>IoT</highPerformanceMode.dataRegionConsensus>
-
<highPerformanceMode.dataRegionReplicaNumber>2</highPerformanceMode.dataRegionReplicaNumber>
-
<highPerformanceMode.schemaRegionConsensus>Ratis</highPerformanceMode.schemaRegionConsensus>
-
<highPerformanceMode.schemaRegionReplicaNumber>3</highPerformanceMode.schemaRegionReplicaNumber>
<integrationTest.excludedGroups/>
<integrationTest.forkCount>1</integrationTest.forkCount>
<integrationTest.includedGroups/>
@@ -60,6 +53,13 @@
<scalableSingleNodeMode.dataRegionReplicaNumber>1</scalableSingleNodeMode.dataRegionReplicaNumber>
<scalableSingleNodeMode.schemaRegionConsensus>Ratis</scalableSingleNodeMode.schemaRegionConsensus>
<scalableSingleNodeMode.schemaRegionReplicaNumber>1</scalableSingleNodeMode.schemaRegionReplicaNumber>
+
<highPerformanceMode.configNodeConsensus>Ratis</highPerformanceMode.configNodeConsensus>
+
<highPerformanceMode.configNodeNumber>1</highPerformanceMode.configNodeNumber>
+
<highPerformanceMode.dataNodeNumber>3</highPerformanceMode.dataNodeNumber>
+
<highPerformanceMode.dataRegionConsensus>IoT</highPerformanceMode.dataRegionConsensus>
+
<highPerformanceMode.dataRegionReplicaNumber>2</highPerformanceMode.dataRegionReplicaNumber>
+
<highPerformanceMode.schemaRegionConsensus>Ratis</highPerformanceMode.schemaRegionConsensus>
+
<highPerformanceMode.schemaRegionReplicaNumber>3</highPerformanceMode.schemaRegionReplicaNumber>
<strongConsistencyClusterMode.configNodeConsensus>Ratis</strongConsistencyClusterMode.configNodeConsensus>
<strongConsistencyClusterMode.configNodeNumber>3</strongConsistencyClusterMode.configNodeNumber>
<strongConsistencyClusterMode.dataNodeNumber>3</strongConsistencyClusterMode.dataNodeNumber>
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
index 0639497a6b1..b0b58ce56d9 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
@@ -722,7 +722,7 @@ public class IoTDBPipeClusterIT extends AbstractPipeDualIT {
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));
if (status.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- successCount.updateAndGet(v -> v + 1);
+ successCount.incrementAndGet();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -752,7 +752,7 @@ public class IoTDBPipeClusterIT extends AbstractPipeDualIT {
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
TSStatus status = client.dropPipe("p1");
if (status.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- successCount.updateAndGet(v -> v + 1);
+ successCount.incrementAndGet();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -771,7 +771,8 @@ public class IoTDBPipeClusterIT extends AbstractPipeDualIT {
t.join();
}
- Assert.assertEquals(10, successCount.get());
+ // Assert at least 1 drop operation succeeds
+ Assert.assertTrue(successCount.get() >= 1);
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 10085a49357..23d82ab5a8d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -290,9 +290,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
private void checkBeforeDropPipeInternal(String pipeName) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
- "Check before drop pipe {}, pipe exists: {}.",
- pipeName,
- isPipeExisted(pipeName) ? "true" : "false");
+ "Check before drop pipe {}, pipe exists: {}.", pipeName,
isPipeExisted(pipeName));
}
// No matter whether the pipe exists, we allow the drop operation executed
on all nodes to
// ensure the consistency.
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index d4c18a36213..4945edd57c2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -358,8 +358,14 @@ public abstract class PipeTaskAgent {
dropPipe(metaInAgent.getStaticMeta().getPipeName());
}
} catch (Exception e) {
- // Do not record the error messages for the pipes don't exist on
coordinator.
+ // Report the exception message for CN to sense the failure of meta
sync
+ final String errorMessage =
+ String.format(
+ "Failed to handle pipe meta changes for %s, because %s",
pipeName, e.getMessage());
LOGGER.warn("Failed to handle pipe meta changes for {}", pipeName, e);
+ exceptionMessages.add(
+ new TPushPipeMetaRespExceptionMessage(
+ pipeName, errorMessage, System.currentTimeMillis()));
}
}