This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 34fc2237819 Pipe: degrade exception to log when executing pipe 
procedure operations on DN to enhance the availability (#11623)
34fc2237819 is described below

commit 34fc22378191087cbb08e4e0a63b876a0621103a
Author: V_Galaxy <[email protected]>
AuthorDate: Tue Nov 28 18:14:59 2023 +0800

    Pipe: degrade exception to log when executing pipe procedure operations on 
DN to enhance the availability (#11623)
    
    To enhance the availability of Pipe, even if an exception occurs during the 
`executeFromOperateOnDataNodes` (`rollbackFromOperateOnDataNodes`) phase of 
procedures like create/start/stop/drop pipe on CN, where pipe metadata on DN is 
not synchronized with CN, it is considered successful. The pipe metadata will 
be automatically synchronized within at most one synchronization cycle.
---
 .../apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java | 43 ++++++++++------
 .../impl/pipe/task/CreatePipeProcedureV2.java      | 19 ++++---
 .../impl/pipe/task/DropPipeProcedureV2.java        |  8 +--
 .../impl/pipe/task/StartPipeProcedureV2.java       | 20 ++++----
 .../impl/pipe/task/StopPipeProcedureV2.java        | 19 +++----
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 59 ++++++++++++----------
 6 files changed, 96 insertions(+), 72 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
index 7d183548e65..f3b363db345 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
@@ -65,7 +65,7 @@ public class IoTDBPipeExtractorIT extends AbstractPipeDualIT {
       Map<String, String> connectorAttributes = new HashMap<>();
 
       extractorAttributes.put("extractor", "iotdb-extractor");
-      extractorAttributes.put("extractor.history.enabled", "true");
+      extractorAttributes.put("extractor.history.enable", "true");
 
       connectorAttributes.put("connector", "iotdb-thrift-connector");
       connectorAttributes.put("connector.batch.enable", "false");
@@ -143,11 +143,12 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualIT {
         return;
       }
 
+      // invalid 'extractor.history.start-time'
       String formatString =
           String.format(
               "create pipe p1"
                   + " with extractor ("
-                  + "'extractor.history.enabled'='true',"
+                  + "'extractor.history.enable'='true',"
                   + "'extractor.history.start-time'=%s)"
                   + " with connector ("
                   + "'connector'='iotdb-thrift-connector',"
@@ -170,6 +171,29 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualIT {
 
       List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
       Assert.assertEquals(0, showPipeResult.size());
+
+      // can not set 'extractor.history.enable' and 
'extractor.realtime.enable' both to false
+      String sql =
+          String.format(
+              "create pipe p1"
+                  + " with extractor ("
+                  + "'extractor.history.enable'='false',"
+                  + "'extractor.realtime.enable'='false')"
+                  + " with connector ("
+                  + "'connector'='iotdb-thrift-connector',"
+                  + "'connector.ip'='%s',"
+                  + "'connector.port'='%s',"
+                  + "'connector.batch.enable'='false')",
+              receiverIp, receiverPort);
+      try (Connection connection = senderEnv.getConnection();
+          Statement statement = connection.createStatement()) {
+        statement.execute(sql);
+        fail();
+      } catch (SQLException ignored) {
+      }
+
+      showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+      Assert.assertEquals(0, showPipeResult.size());
     }
   }
 
@@ -375,21 +399,10 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualIT {
       connectorAttributes.put("connector.ip", receiverIp);
       connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
 
-      extractorAttributes.put("extractor.pattern", "root.db.d1");
+      extractorAttributes.put("extractor.pattern", "root.db.d2");
       extractorAttributes.put("extractor.history.enable", "false");
-      extractorAttributes.put("extractor.realtime.enable", "false");
+      extractorAttributes.put("extractor.realtime.enable", "true");
       TSStatus status =
-          client.createPipe(
-              new TCreatePipeReq("p1", connectorAttributes)
-                  .setExtractorAttributes(extractorAttributes)
-                  .setProcessorAttributes(processorAttributes));
-      // can not set both to false
-      Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(), 
status.getCode());
-
-      extractorAttributes.replace("extractor.pattern", "root.db.d2");
-      extractorAttributes.replace("extractor.history.enable", "false");
-      extractorAttributes.replace("extractor.realtime.enable", "true");
-      status =
           client.createPipe(
               new TCreatePipeReq("p2", connectorAttributes)
                   .setExtractorAttributes(extractorAttributes)
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 7739f202f2e..9eac9be52d3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -146,18 +146,17 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
-      throws PipeException, IOException {
+  protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) 
throws IOException {
     final String pipeName = createPipeRequest.getPipeName();
     LOGGER.info("CreatePipeProcedureV2: executeFromOperateOnDataNodes({})", 
pipeName);
 
     String exceptionMessage =
         parsePushPipeMetaExceptionForPipe(pipeName, 
pushSinglePipeMetaToDataNodes(pipeName, env));
     if (!exceptionMessage.isEmpty()) {
-      throw new PipeException(
-          String.format(
-              "Failed to create pipe %s, details: %s",
-              createPipeRequest.getPipeName(), exceptionMessage));
+      LOGGER.warn(
+          "Failed to create pipe {}, details: {}, metadata will be 
synchronized later.",
+          createPipeRequest.getPipeName(),
+          exceptionMessage);
     }
   }
 
@@ -209,10 +208,10 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
         parsePushPipeMetaExceptionForPipe(
             createPipeRequest.getPipeName(), pushPipeMetaToDataNodes(env));
     if (!exceptionMessage.isEmpty()) {
-      throw new PipeException(
-          String.format(
-              "Failed to rollback create pipe %s, details: %s",
-              createPipeRequest.getPipeName(), exceptionMessage));
+      LOGGER.warn(
+          "Failed to rollback create pipe {}, details: {}, metadata will be 
synchronized later.",
+          createPipeRequest.getPipeName(),
+          exceptionMessage);
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
index 3d9e226fbf9..8958597b2a4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
@@ -89,14 +89,16 @@ public class DropPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) 
throws PipeException {
+  protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
     LOGGER.info("DropPipeProcedureV2: executeFromOperateOnDataNodes({})", 
pipeName);
 
     String exceptionMessage =
         parsePushPipeMetaExceptionForPipe(pipeName, 
dropSinglePipeOnDataNodes(pipeName, env));
     if (!exceptionMessage.isEmpty()) {
-      throw new PipeException(
-          String.format("Failed to drop pipe %s, details: %s", pipeName, 
exceptionMessage));
+      LOGGER.warn(
+          "Failed to drop pipe {}, details: {}, metadata will be synchronized 
later.",
+          pipeName,
+          exceptionMessage);
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
index d36dce384a2..5b43ac83d8d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -93,15 +93,17 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
-      throws PipeException, IOException {
+  protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) 
throws IOException {
     LOGGER.info("StartPipeProcedureV2: executeFromOperateOnDataNodes({})", 
pipeName);
 
     String exceptionMessage =
         parsePushPipeMetaExceptionForPipe(pipeName, 
pushSinglePipeMetaToDataNodes(pipeName, env));
     if (!exceptionMessage.isEmpty()) {
-      throw new PipeException(
-          String.format("Failed to start pipe %s, details: %s", pipeName, 
exceptionMessage));
+      LOGGER.warn(
+          "Failed to start pipe {}, details: {}, metadata will be synchronized 
later.",
+          pipeName,
+          exceptionMessage);
+      return;
     }
 
     // Clear exceptions and set isStoppedByRuntimeException to false if the 
pipe is
@@ -142,17 +144,17 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
-      throws PipeException, IOException {
+  protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) 
throws IOException {
     LOGGER.info("StartPipeProcedureV2: rollbackFromOperateOnDataNodes({})", 
pipeName);
 
     // Push all pipe metas to datanode, may be time-consuming
     String exceptionMessage =
         parsePushPipeMetaExceptionForPipe(pipeName, 
pushPipeMetaToDataNodes(env));
     if (!exceptionMessage.isEmpty()) {
-      throw new PipeException(
-          String.format(
-              "Failed to rollback start pipe %s, details: %s", pipeName, 
exceptionMessage));
+      LOGGER.warn(
+          "Failed to rollback start pipe {}, details: {}, metadata will be 
synchronized later.",
+          pipeName,
+          exceptionMessage);
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index bce57a92a21..3ac00496a00 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -93,15 +93,16 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
-      throws PipeException, IOException {
+  protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) 
throws IOException {
     LOGGER.info("StopPipeProcedureV2: executeFromOperateOnDataNodes({})", 
pipeName);
 
     String exceptionMessage =
         parsePushPipeMetaExceptionForPipe(pipeName, 
pushSinglePipeMetaToDataNodes(pipeName, env));
     if (!exceptionMessage.isEmpty()) {
-      throw new PipeException(
-          String.format("Failed to stop pipe %s, details: %s", pipeName, 
exceptionMessage));
+      LOGGER.warn(
+          "Failed to stop pipe {}, details: {}, metadata will be synchronized 
later.",
+          pipeName,
+          exceptionMessage);
     }
   }
 
@@ -138,17 +139,17 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
-      throws PipeException, IOException {
+  protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) 
throws IOException {
     LOGGER.info("StopPipeProcedureV2: rollbackFromOperateOnDataNodes({})", 
pipeName);
 
     // Push all pipe metas to datanode, may be time-consuming
     String exceptionMessage =
         parsePushPipeMetaExceptionForPipe(pipeName, 
pushPipeMetaToDataNodes(env));
     if (!exceptionMessage.isEmpty()) {
-      throw new PipeException(
-          String.format(
-              "Failed to rollback stop pipe %s, details: %s", pipeName, 
exceptionMessage));
+      LOGGER.warn(
+          "Failed to rollback stop pipe {}, details: {}, metadata will be 
synchronized later.",
+          pipeName,
+          exceptionMessage);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 522622bca6c..6ee4271180a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -35,6 +35,7 @@ import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeCo
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,7 +93,38 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
 
   @Override
   public void validate(PipeParameterValidator validator) {
-    // Do nothing
+    PipeParameters parameters = validator.getParameters();
+
+    // User may set the EXTRACTOR_HISTORY_START_TIME and 
EXTRACTOR_HISTORY_END_TIME without
+    // enabling the historical data extraction, which may affect the realtime 
data extraction.
+    final boolean isHistoricalExtractorEnabledByUser =
+        parameters.getBooleanOrDefault(
+            Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
+            EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
+
+    try {
+      historicalDataExtractionStartTime =
+          isHistoricalExtractorEnabledByUser
+                  && parameters.hasAnyAttributes(
+                      EXTRACTOR_HISTORY_START_TIME_KEY, 
SOURCE_HISTORY_START_TIME_KEY)
+              ? DateTimeUtils.convertDatetimeStrToLong(
+                  parameters.getStringByKeys(
+                      EXTRACTOR_HISTORY_START_TIME_KEY, 
SOURCE_HISTORY_START_TIME_KEY),
+                  ZoneId.systemDefault())
+              : Long.MIN_VALUE;
+      historicalDataExtractionEndTime =
+          isHistoricalExtractorEnabledByUser
+                  && parameters.hasAnyAttributes(
+                      EXTRACTOR_HISTORY_END_TIME_KEY, 
SOURCE_HISTORY_END_TIME_KEY)
+              ? DateTimeUtils.convertDatetimeStrToLong(
+                  parameters.getStringByKeys(
+                      EXTRACTOR_HISTORY_END_TIME_KEY, 
SOURCE_HISTORY_END_TIME_KEY),
+                  ZoneId.systemDefault())
+              : Long.MAX_VALUE;
+    } catch (Exception e) {
+      // compatible with the current validation framework
+      throw new PipeParameterNotValidException(e.getMessage());
+    }
   }
 
   @Override
@@ -125,31 +157,6 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
       }
     }
 
-    // User may set the EXTRACTOR_HISTORY_START_TIME and 
EXTRACTOR_HISTORY_END_TIME without
-    // enabling the historical data extraction, which may affect the realtime 
data extraction.
-    final boolean isHistoricalExtractorEnabledByUser =
-        parameters.getBooleanOrDefault(
-            Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
-            EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
-    historicalDataExtractionStartTime =
-        isHistoricalExtractorEnabledByUser
-                && parameters.hasAnyAttributes(
-                    EXTRACTOR_HISTORY_START_TIME_KEY, 
SOURCE_HISTORY_START_TIME_KEY)
-            ? DateTimeUtils.convertDatetimeStrToLong(
-                parameters.getStringByKeys(
-                    EXTRACTOR_HISTORY_START_TIME_KEY, 
SOURCE_HISTORY_START_TIME_KEY),
-                ZoneId.systemDefault())
-            : Long.MIN_VALUE;
-    historicalDataExtractionEndTime =
-        isHistoricalExtractorEnabledByUser
-                && parameters.hasAnyAttributes(
-                    EXTRACTOR_HISTORY_END_TIME_KEY, 
SOURCE_HISTORY_END_TIME_KEY)
-            ? DateTimeUtils.convertDatetimeStrToLong(
-                parameters.getStringByKeys(
-                    EXTRACTOR_HISTORY_END_TIME_KEY, 
SOURCE_HISTORY_END_TIME_KEY),
-                ZoneId.systemDefault())
-            : Long.MAX_VALUE;
-
     // Enable historical extractor by default
     historicalDataExtractionTimeLowerBound =
         parameters.getBooleanOrDefault(

Reply via email to