This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 34fc2237819 Pipe: degrade exception to log when executing pipe
procedure operations on DN to enhance the availability (#11623)
34fc2237819 is described below
commit 34fc22378191087cbb08e4e0a63b876a0621103a
Author: V_Galaxy <[email protected]>
AuthorDate: Tue Nov 28 18:14:59 2023 +0800
Pipe: degrade exception to log when executing pipe procedure operations on
DN to enhance the availability (#11623)
To enhance the availability of Pipe, even if an exception occurs during the
`executeFromOperateOnDataNodes` (`rollbackFromOperateOnDataNodes`) phase of
procedures like create/start/stop/drop pipe on CN, where pipe metadata on DN is
not synchronized with CN, it is considered successful. The pipe metadata will
be automatically synchronized within at most one synchronization cycle.
---
.../apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java | 43 ++++++++++------
.../impl/pipe/task/CreatePipeProcedureV2.java | 19 ++++---
.../impl/pipe/task/DropPipeProcedureV2.java | 8 +--
.../impl/pipe/task/StartPipeProcedureV2.java | 20 ++++----
.../impl/pipe/task/StopPipeProcedureV2.java | 19 +++----
.../PipeHistoricalDataRegionTsFileExtractor.java | 59 ++++++++++++----------
6 files changed, 96 insertions(+), 72 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
index 7d183548e65..f3b363db345 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
@@ -65,7 +65,7 @@ public class IoTDBPipeExtractorIT extends AbstractPipeDualIT {
Map<String, String> connectorAttributes = new HashMap<>();
extractorAttributes.put("extractor", "iotdb-extractor");
- extractorAttributes.put("extractor.history.enabled", "true");
+ extractorAttributes.put("extractor.history.enable", "true");
connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
@@ -143,11 +143,12 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualIT {
return;
}
+ // invalid 'extractor.history.start-time'
String formatString =
String.format(
"create pipe p1"
+ " with extractor ("
- + "'extractor.history.enabled'='true',"
+ + "'extractor.history.enable'='true',"
+ "'extractor.history.start-time'=%s)"
+ " with connector ("
+ "'connector'='iotdb-thrift-connector',"
@@ -170,6 +171,29 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualIT {
List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
Assert.assertEquals(0, showPipeResult.size());
+
+ // can not set 'extractor.history.enable' and
'extractor.realtime.enable' both to false
+ String sql =
+ String.format(
+ "create pipe p1"
+ + " with extractor ("
+ + "'extractor.history.enable'='false',"
+ + "'extractor.realtime.enable'='false')"
+ + " with connector ("
+ + "'connector'='iotdb-thrift-connector',"
+ + "'connector.ip'='%s',"
+ + "'connector.port'='%s',"
+ + "'connector.batch.enable'='false')",
+ receiverIp, receiverPort);
+ try (Connection connection = senderEnv.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ fail();
+ } catch (SQLException ignored) {
+ }
+
+ showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(0, showPipeResult.size());
}
}
@@ -375,21 +399,10 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualIT {
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
- extractorAttributes.put("extractor.pattern", "root.db.d1");
+ extractorAttributes.put("extractor.pattern", "root.db.d2");
extractorAttributes.put("extractor.history.enable", "false");
- extractorAttributes.put("extractor.realtime.enable", "false");
+ extractorAttributes.put("extractor.realtime.enable", "true");
TSStatus status =
- client.createPipe(
- new TCreatePipeReq("p1", connectorAttributes)
- .setExtractorAttributes(extractorAttributes)
- .setProcessorAttributes(processorAttributes));
- // can not set both to false
- Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(),
status.getCode());
-
- extractorAttributes.replace("extractor.pattern", "root.db.d2");
- extractorAttributes.replace("extractor.history.enable", "false");
- extractorAttributes.replace("extractor.realtime.enable", "true");
- status =
client.createPipe(
new TCreatePipeReq("p2", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 7739f202f2e..9eac9be52d3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -146,18 +146,17 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
}
@Override
- protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
- throws PipeException, IOException {
+ protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
throws IOException {
final String pipeName = createPipeRequest.getPipeName();
LOGGER.info("CreatePipeProcedureV2: executeFromOperateOnDataNodes({})",
pipeName);
String exceptionMessage =
parsePushPipeMetaExceptionForPipe(pipeName,
pushSinglePipeMetaToDataNodes(pipeName, env));
if (!exceptionMessage.isEmpty()) {
- throw new PipeException(
- String.format(
- "Failed to create pipe %s, details: %s",
- createPipeRequest.getPipeName(), exceptionMessage));
+ LOGGER.warn(
+ "Failed to create pipe {}, details: {}, metadata will be
synchronized later.",
+ createPipeRequest.getPipeName(),
+ exceptionMessage);
}
}
@@ -209,10 +208,10 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
parsePushPipeMetaExceptionForPipe(
createPipeRequest.getPipeName(), pushPipeMetaToDataNodes(env));
if (!exceptionMessage.isEmpty()) {
- throw new PipeException(
- String.format(
- "Failed to rollback create pipe %s, details: %s",
- createPipeRequest.getPipeName(), exceptionMessage));
+ LOGGER.warn(
+ "Failed to rollback create pipe {}, details: {}, metadata will be
synchronized later.",
+ createPipeRequest.getPipeName(),
+ exceptionMessage);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
index 3d9e226fbf9..8958597b2a4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
@@ -89,14 +89,16 @@ public class DropPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
}
@Override
- protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
throws PipeException {
+ protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
LOGGER.info("DropPipeProcedureV2: executeFromOperateOnDataNodes({})",
pipeName);
String exceptionMessage =
parsePushPipeMetaExceptionForPipe(pipeName,
dropSinglePipeOnDataNodes(pipeName, env));
if (!exceptionMessage.isEmpty()) {
- throw new PipeException(
- String.format("Failed to drop pipe %s, details: %s", pipeName,
exceptionMessage));
+ LOGGER.warn(
+ "Failed to drop pipe {}, details: {}, metadata will be synchronized
later.",
+ pipeName,
+ exceptionMessage);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
index d36dce384a2..5b43ac83d8d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -93,15 +93,17 @@ public class StartPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
}
@Override
- protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
- throws PipeException, IOException {
+ protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
throws IOException {
LOGGER.info("StartPipeProcedureV2: executeFromOperateOnDataNodes({})",
pipeName);
String exceptionMessage =
parsePushPipeMetaExceptionForPipe(pipeName,
pushSinglePipeMetaToDataNodes(pipeName, env));
if (!exceptionMessage.isEmpty()) {
- throw new PipeException(
- String.format("Failed to start pipe %s, details: %s", pipeName,
exceptionMessage));
+ LOGGER.warn(
+ "Failed to start pipe {}, details: {}, metadata will be synchronized
later.",
+ pipeName,
+ exceptionMessage);
+ return;
}
// Clear exceptions and set isStoppedByRuntimeException to false if the
pipe is
@@ -142,17 +144,17 @@ public class StartPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
}
@Override
- protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
- throws PipeException, IOException {
+ protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
throws IOException {
LOGGER.info("StartPipeProcedureV2: rollbackFromOperateOnDataNodes({})",
pipeName);
// Push all pipe metas to datanode, may be time-consuming
String exceptionMessage =
parsePushPipeMetaExceptionForPipe(pipeName,
pushPipeMetaToDataNodes(env));
if (!exceptionMessage.isEmpty()) {
- throw new PipeException(
- String.format(
- "Failed to rollback start pipe %s, details: %s", pipeName,
exceptionMessage));
+ LOGGER.warn(
+ "Failed to rollback start pipe {}, details: {}, metadata will be
synchronized later.",
+ pipeName,
+ exceptionMessage);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index bce57a92a21..3ac00496a00 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -93,15 +93,16 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
}
@Override
- protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
- throws PipeException, IOException {
+ protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
throws IOException {
LOGGER.info("StopPipeProcedureV2: executeFromOperateOnDataNodes({})",
pipeName);
String exceptionMessage =
parsePushPipeMetaExceptionForPipe(pipeName,
pushSinglePipeMetaToDataNodes(pipeName, env));
if (!exceptionMessage.isEmpty()) {
- throw new PipeException(
- String.format("Failed to stop pipe %s, details: %s", pipeName,
exceptionMessage));
+ LOGGER.warn(
+ "Failed to stop pipe {}, details: {}, metadata will be synchronized
later.",
+ pipeName,
+ exceptionMessage);
}
}
@@ -138,17 +139,17 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
}
@Override
- protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
- throws PipeException, IOException {
+ protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
throws IOException {
LOGGER.info("StopPipeProcedureV2: rollbackFromOperateOnDataNodes({})",
pipeName);
// Push all pipe metas to datanode, may be time-consuming
String exceptionMessage =
parsePushPipeMetaExceptionForPipe(pipeName,
pushPipeMetaToDataNodes(env));
if (!exceptionMessage.isEmpty()) {
- throw new PipeException(
- String.format(
- "Failed to rollback stop pipe %s, details: %s", pipeName,
exceptionMessage));
+ LOGGER.warn(
+ "Failed to rollback stop pipe {}, details: {}, metadata will be
synchronized later.",
+ pipeName,
+ exceptionMessage);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 522622bca6c..6ee4271180a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -35,6 +35,7 @@ import
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeCo
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,7 +93,38 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
@Override
public void validate(PipeParameterValidator validator) {
- // Do nothing
+ PipeParameters parameters = validator.getParameters();
+
+ // User may set the EXTRACTOR_HISTORY_START_TIME and
EXTRACTOR_HISTORY_END_TIME without
+ // enabling the historical data extraction, which may affect the realtime
data extraction.
+ final boolean isHistoricalExtractorEnabledByUser =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
+ EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
+
+ try {
+ historicalDataExtractionStartTime =
+ isHistoricalExtractorEnabledByUser
+ && parameters.hasAnyAttributes(
+ EXTRACTOR_HISTORY_START_TIME_KEY,
SOURCE_HISTORY_START_TIME_KEY)
+ ? DateTimeUtils.convertDatetimeStrToLong(
+ parameters.getStringByKeys(
+ EXTRACTOR_HISTORY_START_TIME_KEY,
SOURCE_HISTORY_START_TIME_KEY),
+ ZoneId.systemDefault())
+ : Long.MIN_VALUE;
+ historicalDataExtractionEndTime =
+ isHistoricalExtractorEnabledByUser
+ && parameters.hasAnyAttributes(
+ EXTRACTOR_HISTORY_END_TIME_KEY,
SOURCE_HISTORY_END_TIME_KEY)
+ ? DateTimeUtils.convertDatetimeStrToLong(
+ parameters.getStringByKeys(
+ EXTRACTOR_HISTORY_END_TIME_KEY,
SOURCE_HISTORY_END_TIME_KEY),
+ ZoneId.systemDefault())
+ : Long.MAX_VALUE;
+ } catch (Exception e) {
+ // compatible with the current validation framework
+ throw new PipeParameterNotValidException(e.getMessage());
+ }
}
@Override
@@ -125,31 +157,6 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
}
}
- // User may set the EXTRACTOR_HISTORY_START_TIME and
EXTRACTOR_HISTORY_END_TIME without
- // enabling the historical data extraction, which may affect the realtime
data extraction.
- final boolean isHistoricalExtractorEnabledByUser =
- parameters.getBooleanOrDefault(
- Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
- EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
- historicalDataExtractionStartTime =
- isHistoricalExtractorEnabledByUser
- && parameters.hasAnyAttributes(
- EXTRACTOR_HISTORY_START_TIME_KEY,
SOURCE_HISTORY_START_TIME_KEY)
- ? DateTimeUtils.convertDatetimeStrToLong(
- parameters.getStringByKeys(
- EXTRACTOR_HISTORY_START_TIME_KEY,
SOURCE_HISTORY_START_TIME_KEY),
- ZoneId.systemDefault())
- : Long.MIN_VALUE;
- historicalDataExtractionEndTime =
- isHistoricalExtractorEnabledByUser
- && parameters.hasAnyAttributes(
- EXTRACTOR_HISTORY_END_TIME_KEY,
SOURCE_HISTORY_END_TIME_KEY)
- ? DateTimeUtils.convertDatetimeStrToLong(
- parameters.getStringByKeys(
- EXTRACTOR_HISTORY_END_TIME_KEY,
SOURCE_HISTORY_END_TIME_KEY),
- ZoneId.systemDefault())
- : Long.MAX_VALUE;
-
// Enable historical extractor by default
historicalDataExtractionTimeLowerBound =
parameters.getBooleanOrDefault(