This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 214d8d2d77d Pipe: fix pipe plugin instance not closed issue after
failed initialization (#12509)
214d8d2d77d is described below
commit 214d8d2d77dacc4265c3eb9ef6df4ae276ee4a69
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sat May 11 16:45:59 2024 +0800
Pipe: fix pipe plugin instance not closed issue after failed initialization
(#12509)
---
.../pipe/execution/PipeConfigNodeSubtask.java | 64 +++++++++++++++-------
.../db/pipe/task/stage/PipeTaskExtractorStage.java | 13 +++++
.../connector/PipeConnectorSubtaskManager.java | 13 +++++
.../SubscriptionConnectorSubtaskManager.java | 14 +++++
.../commons/pipe/agent/plugin/PipePluginAgent.java | 10 +++-
5 files changed, 93 insertions(+), 21 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
index 6785e456103..31c058aa058 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
@@ -80,15 +80,27 @@ public class PipeConfigNodeSubtask extends
PipeAbstractConnectorSubtask {
// 1. Construct extractor
extractor =
PipeConfigNodeAgent.plugin().reflectExtractor(extractorParameters);
- // 2. Validate extractor parameters
- extractor.validate(new PipeParameterValidator(extractorParameters));
-
- // 3. Customize extractor
- final PipeTaskRuntimeConfiguration runtimeConfiguration =
- new PipeTaskRuntimeConfiguration(
- new PipeTaskExtractorRuntimeEnvironment(
- taskID, creationTime, CONFIG_REGION_ID.getId(), pipeTaskMeta));
- extractor.customize(extractorParameters, runtimeConfiguration);
+ try {
+ // 2. Validate extractor parameters
+ extractor.validate(new PipeParameterValidator(extractorParameters));
+
+ // 3. Customize extractor
+ final PipeTaskRuntimeConfiguration runtimeConfiguration =
+ new PipeTaskRuntimeConfiguration(
+ new PipeTaskExtractorRuntimeEnvironment(
+ taskID, creationTime, CONFIG_REGION_ID.getId(),
pipeTaskMeta));
+ extractor.customize(extractorParameters, runtimeConfiguration);
+ } catch (final Exception e) {
+ try {
+ extractor.close();
+ } catch (Exception closeException) {
+ LOGGER.warn(
+ "Failed to close extractor after failed to initialize extractor. "
+ + "Ignore this exception.",
+ closeException);
+ }
+ throw e;
+ }
}
private void initProcessor(Map<String, String> processorAttributes) {
@@ -114,17 +126,29 @@ public class PipeConfigNodeSubtask extends
PipeAbstractConnectorSubtask {
// 1. Construct connector
outputPipeConnector =
PipeConfigNodeAgent.plugin().reflectConnector(connectorParameters);
- // 2. Validate connector parameters
- outputPipeConnector.validate(new
PipeParameterValidator(connectorParameters));
-
- // 3. Customize connector
- final PipeTaskRuntimeConfiguration runtimeConfiguration =
- new PipeTaskRuntimeConfiguration(
- new PipeTaskRuntimeEnvironment(taskID, creationTime,
CONFIG_REGION_ID.getId()));
- outputPipeConnector.customize(connectorParameters, runtimeConfiguration);
-
- // 4. Handshake
- outputPipeConnector.handshake();
+ try {
+ // 2. Validate connector parameters
+ outputPipeConnector.validate(new
PipeParameterValidator(connectorParameters));
+
+ // 3. Customize connector
+ final PipeTaskRuntimeConfiguration runtimeConfiguration =
+ new PipeTaskRuntimeConfiguration(
+ new PipeTaskRuntimeEnvironment(taskID, creationTime,
CONFIG_REGION_ID.getId()));
+ outputPipeConnector.customize(connectorParameters, runtimeConfiguration);
+
+ // 4. Handshake
+ outputPipeConnector.handshake();
+ } catch (final Exception e) {
+ try {
+ outputPipeConnector.close();
+ } catch (final Exception closeException) {
+ LOGGER.warn(
+ "Failed to close connector after failed to initialize connector. "
+ + "Ignore this exception.",
+ closeException);
+ }
+ throw e;
+ }
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
index fa00d8ad637..06206b03bff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
@@ -32,8 +32,13 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class PipeTaskExtractorStage extends PipeTaskStage {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTaskExtractorStage.class);
+
private final PipeExtractor pipeExtractor;
public PipeTaskExtractorStage(
@@ -60,6 +65,14 @@ public class PipeTaskExtractorStage extends PipeTaskStage {
pipeName, creationTime, regionId, pipeTaskMeta));
pipeExtractor.customize(extractorParameters, runtimeConfiguration);
} catch (Exception e) {
+ try {
+ pipeExtractor.close();
+ } catch (Exception closeException) {
+ LOGGER.warn(
+ "Failed to close extractor after failed to initialize extractor. "
+ + "Ignore this exception.",
+ closeException);
+ }
throw new PipeException(e.getMessage(), e);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index 469a7cc67f1..19fddd0c70d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -38,6 +38,9 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -47,6 +50,8 @@ import java.util.TreeMap;
public class PipeConnectorSubtaskManager {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConnectorSubtaskManager.class);
+
private static final String FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE =
"Failed to deregister PipeConnectorSubtask. No such subtask: ";
@@ -117,6 +122,14 @@ public class PipeConnectorSubtaskManager {
pipeConnectorParameters, new
PipeTaskRuntimeConfiguration(environment));
pipeConnector.handshake();
} catch (final Exception e) {
+ try {
+ pipeConnector.close();
+ } catch (final Exception closeException) {
+ LOGGER.warn(
+ "Failed to close connector after failed to initialize
connector. "
+ + "Ignore this exception.",
+ closeException);
+ }
throw new PipeException(
"Failed to construct PipeConnector, because of " +
e.getMessage(), e);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
index 6ac8e70a16d..8f2cbd613d5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
@@ -39,6 +39,9 @@ import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -49,6 +52,9 @@ import static
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.SUB
public class SubscriptionConnectorSubtaskManager {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(SubscriptionConnectorSubtaskManager.class);
+
private static final String FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE =
"Failed to deregister PipeConnectorSubtask. No such subtask: ";
@@ -99,6 +105,14 @@ public class SubscriptionConnectorSubtaskManager {
pipeConnectorParameters, new
PipeTaskRuntimeConfiguration(environment));
pipeConnector.handshake();
} catch (final Exception e) {
+ try {
+ pipeConnector.close();
+ } catch (final Exception closeException) {
+ LOGGER.warn(
+ "Failed to close connector after failed to initialize connector.
"
+ + "Ignore this exception.",
+ closeException);
+ }
throw new PipeException(
"Failed to construct PipeConnector, because of " + e.getMessage(),
e);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
index 3e1b123ec9b..eba03711535 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
@@ -190,9 +190,17 @@ public abstract class PipePluginAgent {
try {
processor.validate(new PipeParameterValidator(replacedParameters));
processor.customize(replacedParameters, runtimeConfigurations);
+ return processor;
} catch (Exception e) {
+ try {
+ processor.close();
+ } catch (Exception closeException) {
+ LOGGER.warn(
+ "Failed to close processor after failed to initialize processor. "
+ + "Ignore this exception.",
+ closeException);
+ }
throw new PipeException(e.getMessage(), e);
}
- return processor;
}
}