This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ca47c7c539 Pipe: Fixed the bug that CN cannot sense drop pipe failure 
in meta sync and may lead to constantly skip of drop pipe (#12059)
4ca47c7c539 is described below

commit 4ca47c7c53904747d2dd011494f7daf00b198511
Author: Caideyipi <[email protected]>
AuthorDate: Wed Feb 21 18:31:53 2024 +0800

    Pipe: Fixed the bug that CN cannot sense drop pipe failure in meta sync and 
may lead to constantly skip of drop pipe (#12059)
---
 integration-test/pom.xml                                   | 14 +++++++-------
 .../java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java  |  7 ++++---
 .../iotdb/confignode/persistence/pipe/PipeTaskInfo.java    |  4 +---
 .../iotdb/commons/pipe/agent/task/PipeTaskAgent.java       |  8 +++++++-
 4 files changed, 19 insertions(+), 14 deletions(-)

diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index 678347f8039..420ccfeeb44 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -29,13 +29,6 @@
     <artifactId>integration-test</artifactId>
     <name>IoTDB: Integration-Test</name>
     <properties>
-        
<highPerformanceMode.configNodeConsensus>Ratis</highPerformanceMode.configNodeConsensus>
-        
<highPerformanceMode.configNodeNumber>1</highPerformanceMode.configNodeNumber>
-        
<highPerformanceMode.dataNodeNumber>3</highPerformanceMode.dataNodeNumber>
-        
<highPerformanceMode.dataRegionConsensus>IoT</highPerformanceMode.dataRegionConsensus>
-        
<highPerformanceMode.dataRegionReplicaNumber>2</highPerformanceMode.dataRegionReplicaNumber>
-        
<highPerformanceMode.schemaRegionConsensus>Ratis</highPerformanceMode.schemaRegionConsensus>
-        
<highPerformanceMode.schemaRegionReplicaNumber>3</highPerformanceMode.schemaRegionReplicaNumber>
         <integrationTest.excludedGroups/>
         <integrationTest.forkCount>1</integrationTest.forkCount>
         <integrationTest.includedGroups/>
@@ -60,6 +53,13 @@
         
<scalableSingleNodeMode.dataRegionReplicaNumber>1</scalableSingleNodeMode.dataRegionReplicaNumber>
         
<scalableSingleNodeMode.schemaRegionConsensus>Ratis</scalableSingleNodeMode.schemaRegionConsensus>
         
<scalableSingleNodeMode.schemaRegionReplicaNumber>1</scalableSingleNodeMode.schemaRegionReplicaNumber>
+        
<highPerformanceMode.configNodeConsensus>Ratis</highPerformanceMode.configNodeConsensus>
+        
<highPerformanceMode.configNodeNumber>1</highPerformanceMode.configNodeNumber>
+        
<highPerformanceMode.dataNodeNumber>3</highPerformanceMode.dataNodeNumber>
+        
<highPerformanceMode.dataRegionConsensus>IoT</highPerformanceMode.dataRegionConsensus>
+        
<highPerformanceMode.dataRegionReplicaNumber>2</highPerformanceMode.dataRegionReplicaNumber>
+        
<highPerformanceMode.schemaRegionConsensus>Ratis</highPerformanceMode.schemaRegionConsensus>
+        
<highPerformanceMode.schemaRegionReplicaNumber>3</highPerformanceMode.schemaRegionReplicaNumber>
         
<strongConsistencyClusterMode.configNodeConsensus>Ratis</strongConsistencyClusterMode.configNodeConsensus>
         
<strongConsistencyClusterMode.configNodeNumber>3</strongConsistencyClusterMode.configNodeNumber>
         
<strongConsistencyClusterMode.dataNodeNumber>3</strongConsistencyClusterMode.dataNodeNumber>
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
index 0639497a6b1..b0b58ce56d9 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
@@ -722,7 +722,7 @@ public class IoTDBPipeClusterIT extends AbstractPipeDualIT {
                               .setExtractorAttributes(extractorAttributes)
                               .setProcessorAttributes(processorAttributes));
                   if (status.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-                    successCount.updateAndGet(v -> v + 1);
+                    successCount.incrementAndGet();
                   }
                 } catch (InterruptedException e) {
                   Thread.currentThread().interrupt();
@@ -752,7 +752,7 @@ public class IoTDBPipeClusterIT extends AbstractPipeDualIT {
                     (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
                   TSStatus status = client.dropPipe("p1");
                   if (status.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-                    successCount.updateAndGet(v -> v + 1);
+                    successCount.incrementAndGet();
                   }
                 } catch (InterruptedException e) {
                   Thread.currentThread().interrupt();
@@ -771,7 +771,8 @@ public class IoTDBPipeClusterIT extends AbstractPipeDualIT {
       t.join();
     }
 
-    Assert.assertEquals(10, successCount.get());
+    // Assert at least 1 drop operation succeeds
+    Assert.assertTrue(successCount.get() >= 1);
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
       List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 10085a49357..23d82ab5a8d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -290,9 +290,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
   private void checkBeforeDropPipeInternal(String pipeName) {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug(
-          "Check before drop pipe {}, pipe exists: {}.",
-          pipeName,
-          isPipeExisted(pipeName) ? "true" : "false");
+          "Check before drop pipe {}, pipe exists: {}.", pipeName, 
isPipeExisted(pipeName));
     }
     // No matter whether the pipe exists, we allow the drop operation executed 
on all nodes to
     // ensure the consistency.
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index d4c18a36213..4945edd57c2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -358,8 +358,14 @@ public abstract class PipeTaskAgent {
           dropPipe(metaInAgent.getStaticMeta().getPipeName());
         }
       } catch (Exception e) {
-        // Do not record the error messages for the pipes don't exist on 
coordinator.
+        // Report the exception message for CN to sense the failure of meta 
sync
+        final String errorMessage =
+            String.format(
+                "Failed to handle pipe meta changes for %s, because %s", 
pipeName, e.getMessage());
         LOGGER.warn("Failed to handle pipe meta changes for {}", pipeName, e);
+        exceptionMessages.add(
+            new TPushPipeMetaRespExceptionMessage(
+                pipeName, errorMessage, System.currentTimeMillis()));
       }
     }
 

Reply via email to