This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch fix-memory-leak
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5d01bb69d7433b006cefa4e4a7415d7b861c98b6
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sat May 11 15:14:00 2024 +0800

    Pipe: fix pipe plugin instance not closed issue after failed initialization
---
 .../pipe/execution/PipeConfigNodeSubtask.java      | 64 +++++++++++++++-------
 .../db/pipe/task/stage/PipeTaskExtractorStage.java | 13 +++++
 .../connector/PipeConnectorSubtaskManager.java     | 13 +++++
 .../SubscriptionConnectorSubtaskManager.java       | 14 +++++
 .../commons/pipe/agent/plugin/PipePluginAgent.java | 10 +++-
 5 files changed, 93 insertions(+), 21 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
index 6785e456103..31c058aa058 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
@@ -80,15 +80,27 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractConnectorSubtask {
     // 1. Construct extractor
     extractor = 
PipeConfigNodeAgent.plugin().reflectExtractor(extractorParameters);
 
-    // 2. Validate extractor parameters
-    extractor.validate(new PipeParameterValidator(extractorParameters));
-
-    // 3. Customize extractor
-    final PipeTaskRuntimeConfiguration runtimeConfiguration =
-        new PipeTaskRuntimeConfiguration(
-            new PipeTaskExtractorRuntimeEnvironment(
-                taskID, creationTime, CONFIG_REGION_ID.getId(), pipeTaskMeta));
-    extractor.customize(extractorParameters, runtimeConfiguration);
+    try {
+      // 2. Validate extractor parameters
+      extractor.validate(new PipeParameterValidator(extractorParameters));
+
+      // 3. Customize extractor
+      final PipeTaskRuntimeConfiguration runtimeConfiguration =
+          new PipeTaskRuntimeConfiguration(
+              new PipeTaskExtractorRuntimeEnvironment(
+                  taskID, creationTime, CONFIG_REGION_ID.getId(), 
pipeTaskMeta));
+      extractor.customize(extractorParameters, runtimeConfiguration);
+    } catch (final Exception e) {
+      try {
+        extractor.close();
+      } catch (Exception closeException) {
+        LOGGER.warn(
+            "Failed to close extractor after failed to initialize extractor. "
+                + "Ignore this exception.",
+            closeException);
+      }
+      throw e;
+    }
   }
 
   private void initProcessor(Map<String, String> processorAttributes) {
@@ -114,17 +126,29 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractConnectorSubtask {
     // 1. Construct connector
     outputPipeConnector = 
PipeConfigNodeAgent.plugin().reflectConnector(connectorParameters);
 
-    // 2. Validate connector parameters
-    outputPipeConnector.validate(new 
PipeParameterValidator(connectorParameters));
-
-    // 3. Customize connector
-    final PipeTaskRuntimeConfiguration runtimeConfiguration =
-        new PipeTaskRuntimeConfiguration(
-            new PipeTaskRuntimeEnvironment(taskID, creationTime, 
CONFIG_REGION_ID.getId()));
-    outputPipeConnector.customize(connectorParameters, runtimeConfiguration);
-
-    // 4. Handshake
-    outputPipeConnector.handshake();
+    try {
+      // 2. Validate connector parameters
+      outputPipeConnector.validate(new 
PipeParameterValidator(connectorParameters));
+
+      // 3. Customize connector
+      final PipeTaskRuntimeConfiguration runtimeConfiguration =
+          new PipeTaskRuntimeConfiguration(
+              new PipeTaskRuntimeEnvironment(taskID, creationTime, 
CONFIG_REGION_ID.getId()));
+      outputPipeConnector.customize(connectorParameters, runtimeConfiguration);
+
+      // 4. Handshake
+      outputPipeConnector.handshake();
+    } catch (final Exception e) {
+      try {
+        outputPipeConnector.close();
+      } catch (final Exception closeException) {
+        LOGGER.warn(
+            "Failed to close connector after failed to initialize connector. "
+                + "Ignore this exception.",
+            closeException);
+      }
+      throw e;
+    }
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
index fa00d8ad637..06206b03bff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
@@ -32,8 +32,13 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class PipeTaskExtractorStage extends PipeTaskStage {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTaskExtractorStage.class);
+
   private final PipeExtractor pipeExtractor;
 
   public PipeTaskExtractorStage(
@@ -60,6 +65,14 @@ public class PipeTaskExtractorStage extends PipeTaskStage {
                   pipeName, creationTime, regionId, pipeTaskMeta));
       pipeExtractor.customize(extractorParameters, runtimeConfiguration);
     } catch (Exception e) {
+      try {
+        pipeExtractor.close();
+      } catch (Exception closeException) {
+        LOGGER.warn(
+            "Failed to close extractor after failed to initialize extractor. "
+                + "Ignore this exception.",
+            closeException);
+      }
       throw new PipeException(e.getMessage(), e);
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index 469a7cc67f1..19fddd0c70d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -38,6 +38,9 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -47,6 +50,8 @@ import java.util.TreeMap;
 
 public class PipeConnectorSubtaskManager {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConnectorSubtaskManager.class);
+
   private static final String FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE =
       "Failed to deregister PipeConnectorSubtask. No such subtask: ";
 
@@ -117,6 +122,14 @@ public class PipeConnectorSubtaskManager {
               pipeConnectorParameters, new 
PipeTaskRuntimeConfiguration(environment));
           pipeConnector.handshake();
         } catch (final Exception e) {
+          try {
+            pipeConnector.close();
+          } catch (final Exception closeException) {
+            LOGGER.warn(
+                "Failed to close connector after failed to initialize 
connector. "
+                    + "Ignore this exception.",
+                closeException);
+          }
           throw new PipeException(
               "Failed to construct PipeConnector, because of " + 
e.getMessage(), e);
         }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
index 6ac8e70a16d..8f2cbd613d5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
@@ -39,6 +39,9 @@ import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -49,6 +52,9 @@ import static 
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.SUB
 
 public class SubscriptionConnectorSubtaskManager {
 
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(SubscriptionConnectorSubtaskManager.class);
+
   private static final String FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE =
       "Failed to deregister PipeConnectorSubtask. No such subtask: ";
 
@@ -99,6 +105,14 @@ public class SubscriptionConnectorSubtaskManager {
             pipeConnectorParameters, new 
PipeTaskRuntimeConfiguration(environment));
         pipeConnector.handshake();
       } catch (final Exception e) {
+        try {
+          pipeConnector.close();
+        } catch (final Exception closeException) {
+          LOGGER.warn(
+              "Failed to close connector after failed to initialize connector. 
"
+                  + "Ignore this exception.",
+              closeException);
+        }
         throw new PipeException(
             "Failed to construct PipeConnector, because of " + e.getMessage(), 
e);
       }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
index 3e1b123ec9b..eba03711535 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
@@ -190,9 +190,17 @@ public abstract class PipePluginAgent {
     try {
       processor.validate(new PipeParameterValidator(replacedParameters));
       processor.customize(replacedParameters, runtimeConfigurations);
+      return processor;
     } catch (Exception e) {
+      try {
+        processor.close();
+      } catch (Exception closeException) {
+        LOGGER.warn(
+            "Failed to close processor after failed to initialize processor. "
+                + "Ignore this exception.",
+            closeException);
+      }
       throw new PipeException(e.getMessage(), e);
     }
-    return processor;
   }
 }

Reply via email to