This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f29fcf26d8 Pipe: Fixed connector subtask manager to allow restarted 
DataRegions to reuse connectors from other not restarted DataRegions (#12470)
6f29fcf26d8 is described below

commit 6f29fcf26d8534b68fb733003e6a6a4caadd8e58
Author: Caideyipi <[email protected]>
AuthorDate: Mon May 6 20:12:12 2024 +0800

    Pipe: Fixed connector subtask manager to allow restarted DataRegions to 
reuse connectors from other not restarted DataRegions (#12470)
---
 .../connector/PipeConnectorSubtaskManager.java     | 29 +++++++++++------
 .../SubscriptionConnectorSubtaskManager.java       | 37 ++++++++++++++--------
 2 files changed, 44 insertions(+), 22 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index 968c73b4c5d..469a7cc67f1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.task.subtask.connector;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
+import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
@@ -53,9 +54,9 @@ public class PipeConnectorSubtaskManager {
       attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
 
   public synchronized String register(
-      PipeConnectorSubtaskExecutor executor,
-      PipeParameters pipeConnectorParameters,
-      PipeTaskConnectorRuntimeEnvironment environment) {
+      final PipeConnectorSubtaskExecutor executor,
+      final PipeParameters pipeConnectorParameters,
+      final PipeTaskConnectorRuntimeEnvironment environment) {
     final String connectorKey =
         pipeConnectorParameters
             .getStringOrDefault(
@@ -77,7 +78,7 @@ public class PipeConnectorSubtaskManager {
             .contains(new DataRegionId(environment.getRegionId()));
 
     final int connectorNum;
-    String attributeSortedString = new 
TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
+    String attributeSortedString = 
generateAttributeSortedString(pipeConnectorParameters);
     if (isDataRegionConnector) {
       connectorNum =
           pipeConnectorParameters.getIntOrDefault(
@@ -115,7 +116,7 @@ public class PipeConnectorSubtaskManager {
           pipeConnector.customize(
               pipeConnectorParameters, new 
PipeTaskRuntimeConfiguration(environment));
           pipeConnector.handshake();
-        } catch (Exception e) {
+        } catch (final Exception e) {
           throw new PipeException(
               "Failed to construct PipeConnector, because of " + 
e.getMessage(), e);
         }
@@ -149,7 +150,10 @@ public class PipeConnectorSubtaskManager {
   }
 
   public synchronized void deregister(
-      String pipeName, long creationTime, int dataRegionId, String 
attributeSortedString) {
+      final String pipeName,
+      final long creationTime,
+      final int dataRegionId,
+      final String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
     }
@@ -165,7 +169,7 @@ public class PipeConnectorSubtaskManager {
     PipeEventCommitManager.getInstance().deregister(pipeName, creationTime, 
dataRegionId);
   }
 
-  public synchronized void start(String attributeSortedString) {
+  public synchronized void start(final String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
     }
@@ -176,7 +180,7 @@ public class PipeConnectorSubtaskManager {
     }
   }
 
-  public synchronized void stop(String attributeSortedString) {
+  public synchronized void stop(final String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
     }
@@ -188,7 +192,7 @@ public class PipeConnectorSubtaskManager {
   }
 
   public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
-      String attributeSortedString) {
+      final String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       throw new PipeException(
           "Failed to get PendingQueue. No such subtask: " + 
attributeSortedString);
@@ -201,6 +205,13 @@ public class PipeConnectorSubtaskManager {
         .getPendingQueue();
   }
 
+  private String generateAttributeSortedString(final PipeParameters 
pipeConnectorParameters) {
+    final TreeMap<String, String> sortedStringSourceMap =
+        new TreeMap<>(pipeConnectorParameters.getAttribute());
+    sortedStringSourceMap.remove(SystemConstant.RESTART_KEY);
+    return sortedStringSourceMap.toString();
+  }
+
   /////////////////////////  Singleton Instance Holder  
/////////////////////////
 
   private PipeConnectorSubtaskManager() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
index 6627bf2948a..6ac8e70a16d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.subscription.task.subtask;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
+import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
@@ -56,9 +57,9 @@ public class SubscriptionConnectorSubtaskManager {
       attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
 
   public synchronized String register(
-      PipeConnectorSubtaskExecutor executor,
-      PipeParameters pipeConnectorParameters,
-      PipeTaskConnectorRuntimeEnvironment environment) {
+      final PipeConnectorSubtaskExecutor executor,
+      final PipeParameters pipeConnectorParameters,
+      final PipeTaskConnectorRuntimeEnvironment environment) {
     final String connectorKey =
         pipeConnectorParameters
             .getStringOrDefault(
@@ -79,7 +80,7 @@ public class SubscriptionConnectorSubtaskManager {
             environment.getRegionId(),
             connectorKey);
 
-    String attributeSortedString = new 
TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
+    String attributeSortedString = 
generateAttributeSortedString(pipeConnectorParameters);
     attributeSortedString = "__subscription_" + attributeSortedString;
 
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
@@ -97,7 +98,7 @@ public class SubscriptionConnectorSubtaskManager {
         pipeConnector.customize(
             pipeConnectorParameters, new 
PipeTaskRuntimeConfiguration(environment));
         pipeConnector.handshake();
-      } catch (Exception e) {
+      } catch (final Exception e) {
         throw new PipeException(
             "Failed to construct PipeConnector, because of " + e.getMessage(), 
e);
       }
@@ -133,7 +134,7 @@ public class SubscriptionConnectorSubtaskManager {
           attributeSortedString, pipeConnectorSubtaskLifeCycle);
     }
 
-    PipeConnectorSubtaskLifeCycle lifeCycle =
+    final PipeConnectorSubtaskLifeCycle lifeCycle =
         attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
     lifeCycle.register();
 
@@ -141,12 +142,15 @@ public class SubscriptionConnectorSubtaskManager {
   }
 
   public synchronized void deregister(
-      String pipeName, long creationTime, int dataRegionId, String 
attributeSortedString) {
+      final String pipeName,
+      final long creationTime,
+      final int dataRegionId,
+      final String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
     }
 
-    PipeConnectorSubtaskLifeCycle lifeCycle =
+    final PipeConnectorSubtaskLifeCycle lifeCycle =
         attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
     if (lifeCycle.deregister(pipeName)) {
       attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
@@ -155,28 +159,28 @@ public class SubscriptionConnectorSubtaskManager {
     PipeEventCommitManager.getInstance().deregister(pipeName, creationTime, 
dataRegionId);
   }
 
-  public synchronized void start(String attributeSortedString) {
+  public synchronized void start(final String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
     }
 
-    PipeConnectorSubtaskLifeCycle lifeCycle =
+    final PipeConnectorSubtaskLifeCycle lifeCycle =
         attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
     lifeCycle.start();
   }
 
-  public synchronized void stop(String attributeSortedString) {
+  public synchronized void stop(final String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
     }
 
-    PipeConnectorSubtaskLifeCycle lifeCycle =
+    final PipeConnectorSubtaskLifeCycle lifeCycle =
         attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
     lifeCycle.stop();
   }
 
   public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
-      String attributeSortedString) {
+      final String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       throw new PipeException(
           "Failed to get PendingQueue. No such subtask: " + 
attributeSortedString);
@@ -185,6 +189,13 @@ public class SubscriptionConnectorSubtaskManager {
     return 
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getPendingQueue();
   }
 
+  private String generateAttributeSortedString(final PipeParameters 
pipeConnectorParameters) {
+    final TreeMap<String, String> sortedStringSourceMap =
+        new TreeMap<>(pipeConnectorParameters.getAttribute());
+    sortedStringSourceMap.remove(SystemConstant.RESTART_KEY);
+    return sortedStringSourceMap.toString();
+  }
+
   /////////////////////////  Singleton Instance Holder  
/////////////////////////
 
   private SubscriptionConnectorSubtaskManager() {

Reply via email to