This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6f29fcf26d8 Pipe: Fixed connector subtask manager to allow restarted
DataRegions to reuse connectors from other not restarted DataRegions (#12470)
6f29fcf26d8 is described below
commit 6f29fcf26d8534b68fb733003e6a6a4caadd8e58
Author: Caideyipi <[email protected]>
AuthorDate: Mon May 6 20:12:12 2024 +0800
Pipe: Fixed connector subtask manager to allow restarted DataRegions to
reuse connectors from other not restarted DataRegions (#12470)
---
.../connector/PipeConnectorSubtaskManager.java | 29 +++++++++++------
.../SubscriptionConnectorSubtaskManager.java | 37 ++++++++++++++--------
2 files changed, 44 insertions(+), 22 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index 968c73b4c5d..469a7cc67f1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.task.subtask.connector;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
+import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
@@ -53,9 +54,9 @@ public class PipeConnectorSubtaskManager {
attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
public synchronized String register(
- PipeConnectorSubtaskExecutor executor,
- PipeParameters pipeConnectorParameters,
- PipeTaskConnectorRuntimeEnvironment environment) {
+ final PipeConnectorSubtaskExecutor executor,
+ final PipeParameters pipeConnectorParameters,
+ final PipeTaskConnectorRuntimeEnvironment environment) {
final String connectorKey =
pipeConnectorParameters
.getStringOrDefault(
@@ -77,7 +78,7 @@ public class PipeConnectorSubtaskManager {
.contains(new DataRegionId(environment.getRegionId()));
final int connectorNum;
- String attributeSortedString = new
TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
+ String attributeSortedString =
generateAttributeSortedString(pipeConnectorParameters);
if (isDataRegionConnector) {
connectorNum =
pipeConnectorParameters.getIntOrDefault(
@@ -115,7 +116,7 @@ public class PipeConnectorSubtaskManager {
pipeConnector.customize(
pipeConnectorParameters, new
PipeTaskRuntimeConfiguration(environment));
pipeConnector.handshake();
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new PipeException(
"Failed to construct PipeConnector, because of " +
e.getMessage(), e);
}
@@ -149,7 +150,10 @@ public class PipeConnectorSubtaskManager {
}
public synchronized void deregister(
- String pipeName, long creationTime, int dataRegionId, String
attributeSortedString) {
+ final String pipeName,
+ final long creationTime,
+ final int dataRegionId,
+ final String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
}
@@ -165,7 +169,7 @@ public class PipeConnectorSubtaskManager {
PipeEventCommitManager.getInstance().deregister(pipeName, creationTime,
dataRegionId);
}
- public synchronized void start(String attributeSortedString) {
+ public synchronized void start(final String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
}
@@ -176,7 +180,7 @@ public class PipeConnectorSubtaskManager {
}
}
- public synchronized void stop(String attributeSortedString) {
+ public synchronized void stop(final String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
}
@@ -188,7 +192,7 @@ public class PipeConnectorSubtaskManager {
}
public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
- String attributeSortedString) {
+ final String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
throw new PipeException(
"Failed to get PendingQueue. No such subtask: " +
attributeSortedString);
@@ -201,6 +205,13 @@ public class PipeConnectorSubtaskManager {
.getPendingQueue();
}
+ private String generateAttributeSortedString(final PipeParameters
pipeConnectorParameters) {
+ final TreeMap<String, String> sortedStringSourceMap =
+ new TreeMap<>(pipeConnectorParameters.getAttribute());
+ sortedStringSourceMap.remove(SystemConstant.RESTART_KEY);
+ return sortedStringSourceMap.toString();
+ }
+
///////////////////////// Singleton Instance Holder
/////////////////////////
private PipeConnectorSubtaskManager() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
index 6627bf2948a..6ac8e70a16d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.subscription.task.subtask;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
+import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
@@ -56,9 +57,9 @@ public class SubscriptionConnectorSubtaskManager {
attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
public synchronized String register(
- PipeConnectorSubtaskExecutor executor,
- PipeParameters pipeConnectorParameters,
- PipeTaskConnectorRuntimeEnvironment environment) {
+ final PipeConnectorSubtaskExecutor executor,
+ final PipeParameters pipeConnectorParameters,
+ final PipeTaskConnectorRuntimeEnvironment environment) {
final String connectorKey =
pipeConnectorParameters
.getStringOrDefault(
@@ -79,7 +80,7 @@ public class SubscriptionConnectorSubtaskManager {
environment.getRegionId(),
connectorKey);
- String attributeSortedString = new
TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
+ String attributeSortedString =
generateAttributeSortedString(pipeConnectorParameters);
attributeSortedString = "__subscription_" + attributeSortedString;
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
@@ -97,7 +98,7 @@ public class SubscriptionConnectorSubtaskManager {
pipeConnector.customize(
pipeConnectorParameters, new
PipeTaskRuntimeConfiguration(environment));
pipeConnector.handshake();
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new PipeException(
"Failed to construct PipeConnector, because of " + e.getMessage(),
e);
}
@@ -133,7 +134,7 @@ public class SubscriptionConnectorSubtaskManager {
attributeSortedString, pipeConnectorSubtaskLifeCycle);
}
- PipeConnectorSubtaskLifeCycle lifeCycle =
+ final PipeConnectorSubtaskLifeCycle lifeCycle =
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
lifeCycle.register();
@@ -141,12 +142,15 @@ public class SubscriptionConnectorSubtaskManager {
}
public synchronized void deregister(
- String pipeName, long creationTime, int dataRegionId, String
attributeSortedString) {
+ final String pipeName,
+ final long creationTime,
+ final int dataRegionId,
+ final String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
}
- PipeConnectorSubtaskLifeCycle lifeCycle =
+ final PipeConnectorSubtaskLifeCycle lifeCycle =
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
if (lifeCycle.deregister(pipeName)) {
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
@@ -155,28 +159,28 @@ public class SubscriptionConnectorSubtaskManager {
PipeEventCommitManager.getInstance().deregister(pipeName, creationTime,
dataRegionId);
}
- public synchronized void start(String attributeSortedString) {
+ public synchronized void start(final String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
}
- PipeConnectorSubtaskLifeCycle lifeCycle =
+ final PipeConnectorSubtaskLifeCycle lifeCycle =
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
lifeCycle.start();
}
- public synchronized void stop(String attributeSortedString) {
+ public synchronized void stop(final String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
}
- PipeConnectorSubtaskLifeCycle lifeCycle =
+ final PipeConnectorSubtaskLifeCycle lifeCycle =
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
lifeCycle.stop();
}
public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
- String attributeSortedString) {
+ final String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
throw new PipeException(
"Failed to get PendingQueue. No such subtask: " +
attributeSortedString);
@@ -185,6 +189,13 @@ public class SubscriptionConnectorSubtaskManager {
return
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getPendingQueue();
}
+ private String generateAttributeSortedString(final PipeParameters
pipeConnectorParameters) {
+ final TreeMap<String, String> sortedStringSourceMap =
+ new TreeMap<>(pipeConnectorParameters.getAttribute());
+ sortedStringSourceMap.remove(SystemConstant.RESTART_KEY);
+ return sortedStringSourceMap.toString();
+ }
+
///////////////////////// Singleton Instance Holder
/////////////////////////
private SubscriptionConnectorSubtaskManager() {