This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-memory-leak in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5d01bb69d7433b006cefa4e4a7415d7b861c98b6 Author: Steve Yurong Su <[email protected]> AuthorDate: Sat May 11 15:14:00 2024 +0800 Pipe: fix pipe plugin instance not closed issue after failed initialization --- .../pipe/execution/PipeConfigNodeSubtask.java | 64 +++++++++++++++------- .../db/pipe/task/stage/PipeTaskExtractorStage.java | 13 +++++ .../connector/PipeConnectorSubtaskManager.java | 13 +++++ .../SubscriptionConnectorSubtaskManager.java | 14 +++++ .../commons/pipe/agent/plugin/PipePluginAgent.java | 10 +++- 5 files changed, 93 insertions(+), 21 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java index 6785e456103..31c058aa058 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java @@ -80,15 +80,27 @@ public class PipeConfigNodeSubtask extends PipeAbstractConnectorSubtask { // 1. Construct extractor extractor = PipeConfigNodeAgent.plugin().reflectExtractor(extractorParameters); - // 2. Validate extractor parameters - extractor.validate(new PipeParameterValidator(extractorParameters)); - - // 3. Customize extractor - final PipeTaskRuntimeConfiguration runtimeConfiguration = - new PipeTaskRuntimeConfiguration( - new PipeTaskExtractorRuntimeEnvironment( - taskID, creationTime, CONFIG_REGION_ID.getId(), pipeTaskMeta)); - extractor.customize(extractorParameters, runtimeConfiguration); + try { + // 2. Validate extractor parameters + extractor.validate(new PipeParameterValidator(extractorParameters)); + + // 3. Customize extractor + final PipeTaskRuntimeConfiguration runtimeConfiguration = + new PipeTaskRuntimeConfiguration( + new PipeTaskExtractorRuntimeEnvironment( + taskID, creationTime, CONFIG_REGION_ID.getId(), pipeTaskMeta)); + extractor.customize(extractorParameters, runtimeConfiguration); + } catch (final Exception e) { + try { + extractor.close(); + } catch (Exception closeException) { + LOGGER.warn( + "Failed to close extractor after failed to initialize extractor. " + + "Ignore this exception.", + closeException); + } + throw e; + } } private void initProcessor(Map<String, String> processorAttributes) { @@ -114,17 +126,29 @@ public class PipeConfigNodeSubtask extends PipeAbstractConnectorSubtask { // 1. Construct connector outputPipeConnector = PipeConfigNodeAgent.plugin().reflectConnector(connectorParameters); - // 2. Validate connector parameters - outputPipeConnector.validate(new PipeParameterValidator(connectorParameters)); - - // 3. Customize connector - final PipeTaskRuntimeConfiguration runtimeConfiguration = - new PipeTaskRuntimeConfiguration( - new PipeTaskRuntimeEnvironment(taskID, creationTime, CONFIG_REGION_ID.getId())); - outputPipeConnector.customize(connectorParameters, runtimeConfiguration); - - // 4. Handshake - outputPipeConnector.handshake(); + try { + // 2. Validate connector parameters + outputPipeConnector.validate(new PipeParameterValidator(connectorParameters)); + + // 3. Customize connector + final PipeTaskRuntimeConfiguration runtimeConfiguration = + new PipeTaskRuntimeConfiguration( + new PipeTaskRuntimeEnvironment(taskID, creationTime, CONFIG_REGION_ID.getId())); + outputPipeConnector.customize(connectorParameters, runtimeConfiguration); + + // 4. Handshake + outputPipeConnector.handshake(); + } catch (final Exception e) { + try { + outputPipeConnector.close(); + } catch (final Exception closeException) { + LOGGER.warn( + "Failed to close connector after failed to initialize connector. " + + "Ignore this exception.", + closeException); + } + throw e; + } } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java index fa00d8ad637..06206b03bff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java @@ -32,8 +32,13 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class PipeTaskExtractorStage extends PipeTaskStage { + private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskExtractorStage.class); + private final PipeExtractor pipeExtractor; public PipeTaskExtractorStage( @@ -60,6 +65,14 @@ public class PipeTaskExtractorStage extends PipeTaskStage { pipeName, creationTime, regionId, pipeTaskMeta)); pipeExtractor.customize(extractorParameters, runtimeConfiguration); } catch (Exception e) { + try { + pipeExtractor.close(); + } catch (Exception closeException) { + LOGGER.warn( + "Failed to close extractor after failed to initialize extractor. " + + "Ignore this exception.", + closeException); + } throw new PipeException(e.getMessage(), e); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java index 469a7cc67f1..19fddd0c70d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java @@ -38,6 +38,9 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -47,6 +50,8 @@ import java.util.TreeMap; public class PipeConnectorSubtaskManager { + private static final Logger LOGGER = LoggerFactory.getLogger(PipeConnectorSubtaskManager.class); + private static final String FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE = "Failed to deregister PipeConnectorSubtask. No such subtask: "; @@ -117,6 +122,14 @@ public class PipeConnectorSubtaskManager { pipeConnectorParameters, new PipeTaskRuntimeConfiguration(environment)); pipeConnector.handshake(); } catch (final Exception e) { + try { + pipeConnector.close(); + } catch (final Exception closeException) { + LOGGER.warn( + "Failed to close connector after failed to initialize connector. " + + "Ignore this exception.", + closeException); + } throw new PipeException( "Failed to construct PipeConnector, because of " + e.getMessage(), e); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java index 6ac8e70a16d..8f2cbd613d5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java @@ -39,6 +39,9 @@ import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.subscription.exception.SubscriptionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -49,6 +52,9 @@ import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.SUB public class SubscriptionConnectorSubtaskManager { + private static final Logger LOGGER = + LoggerFactory.getLogger(SubscriptionConnectorSubtaskManager.class); + private static final String FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE = "Failed to deregister PipeConnectorSubtask. No such subtask: "; @@ -99,6 +105,14 @@ public class SubscriptionConnectorSubtaskManager { pipeConnectorParameters, new PipeTaskRuntimeConfiguration(environment)); pipeConnector.handshake(); } catch (final Exception e) { + try { + pipeConnector.close(); + } catch (final Exception closeException) { + LOGGER.warn( + "Failed to close connector after failed to initialize connector. " + + "Ignore this exception.", + closeException); + } throw new PipeException( "Failed to construct PipeConnector, because of " + e.getMessage(), e); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java index 3e1b123ec9b..eba03711535 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java @@ -190,9 +190,17 @@ public abstract class PipePluginAgent { try { processor.validate(new PipeParameterValidator(replacedParameters)); processor.customize(replacedParameters, runtimeConfigurations); + return processor; } catch (Exception e) { + try { + processor.close(); + } catch (Exception closeException) { + LOGGER.warn( + "Failed to close processor after failed to initialize processor. " + + "Ignore this exception.", + closeException); + } throw new PipeException(e.getMessage(), e); } - return processor; } }
