This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f971de235ec Fix old pipe root user compatibility (#17985)
f971de235ec is described below
commit f971de235eca05453c8b6fec27ffe2d5a7f9ed22
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 22 09:37:45 2026 +0800
Fix old pipe root user compatibility (#17985)
* Fix old pipe root user compatibility
* Fix ConfigurationFileUtilsTest import
* Update AbstractEnv.java
* Address pipe compatibility review comments
---
.../iotdb/confignode/i18n/ConfigNodeMessages.java | 3 +
.../iotdb/confignode/i18n/ConfigNodeMessages.java | 3 +
.../iotdb/confignode/manager/ConfigManager.java | 4 +-
.../persistence/executor/ConfigPlanExecutor.java | 1 +
.../confignode/persistence/pipe/PipeInfo.java | 8 +-
.../confignode/persistence/pipe/PipeTaskInfo.java | 55 ++++++-
.../pipe/PipeTaskInfoAutoRestartTest.java | 169 ++++++++++++++++++++-
.../pipe/agent/task/meta/PipeStaticMeta.java | 60 ++++++++
8 files changed, 292 insertions(+), 11 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
index f662fa5871d..69a69c76db7 100644
---
a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
+++
b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
@@ -216,6 +216,9 @@ public final class ConfigNodeMessages {
"Failed to drop trigger [%s], this trigger has not been created";
public static final String FAILED_TO_DROP_UDF_THIS_UDF_HAS_NOT_BEEN_CREATED =
"Failed to drop UDF [%s], this UDF has not been created";
+ public static final String
+
FAILED_TO_ENRICH_PIPE_WITH_ROOT_USER_FOR_COMPATIBILITY_BECAUSE_ROOT_USER_DOES_NOT_EXIST
=
+ "Failed to enrich pipe %s with root user for compatibility because
root user %s does not exist.";
public static final String
FAILED_TO_FETCH_SCHEMAENGINE_BLACK_LIST_ON_DATANODE =
"Failed to fetch schemaengine black list on DataNode {}, {}";
public static final String FAILED_TO_GET_FIELD = "Failed to get field {}";
diff --git
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
index 8bffa1b0831..6bf3da0e68b 100644
---
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
+++
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
@@ -212,6 +212,9 @@ public final class ConfigNodeMessages {
"Failed to drop trigger [%s], this trigger has not been created";
public static final String FAILED_TO_DROP_UDF_THIS_UDF_HAS_NOT_BEEN_CREATED =
"Failed to drop UDF [%s], this UDF has not been created";
+ public static final String
+
FAILED_TO_ENRICH_PIPE_WITH_ROOT_USER_FOR_COMPATIBILITY_BECAUSE_ROOT_USER_DOES_NOT_EXIST
=
+ "Failed to enrich pipe %s with root user for compatibility because
root user %s does not exist.";
public static final String
FAILED_TO_FETCH_SCHEMAENGINE_BLACK_LIST_ON_DATANODE =
"Failed to fetch schemaengine black list on DataNode {}, {}";
public static final String FAILED_TO_GET_FIELD = "Failed to get field {}";
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 92a22fe992f..da10bb22283 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -375,7 +375,8 @@ public class ConfigManager implements IManager {
TriggerInfo triggerInfo = new TriggerInfo();
CQInfo cqInfo = new CQInfo();
ExternalServiceInfo externalServiceInfo = new ExternalServiceInfo();
- PipeInfo pipeInfo = new PipeInfo();
+ this.permissionManager = createPermissionManager(authorInfo);
+ PipeInfo pipeInfo = new PipeInfo(userName ->
this.permissionManager.login4Pipe(userName, null));
QuotaInfo quotaInfo = new QuotaInfo();
TTLInfo ttlInfo = new TTLInfo();
SubscriptionInfo subscriptionInfo = new SubscriptionInfo();
@@ -409,7 +410,6 @@ public class ConfigManager implements IManager {
new ClusterSchemaQuotaStatistics(
COMMON_CONF.getSeriesLimitThreshold(),
COMMON_CONF.getDeviceLimitThreshold()));
this.partitionManager = new PartitionManager(this, partitionInfo);
- this.permissionManager = createPermissionManager(authorInfo);
this.procedureManager = createProcedureManager(procedureInfo);
this.externalServiceManager = new ExternalServiceManager(this);
this.udfManager = new UDFManager(this, udfInfo);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index ed23fda8a6f..4a70ace8ca1 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -801,6 +801,7 @@ public class ConfigPlanExecutor {
}
});
if (result.get()) {
+ pipeInfo.getPipeTaskInfo().enrichPipeMetasWithRootUserForCompatibility();
LOGGER.info(
ConfigNodeMessages.CONFIGNODESNAPSHOT_LOAD_SNAPSHOT_SUCCESS_LATESTSNAPSHOTROOTDIR,
latestSnapshotRootDir);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
index e353398cd9d..bef26386afd 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
@@ -49,6 +49,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.function.Function;
public class PipeInfo implements SnapshotProcessor {
@@ -58,8 +59,13 @@ public class PipeInfo implements SnapshotProcessor {
private final PipeTaskInfo pipeTaskInfo;
public PipeInfo() throws IOException {
+ this(null);
+ }
+
+ public PipeInfo(final Function<String, String>
pipeUserCurrentPasswordProvider)
+ throws IOException {
pipePluginInfo = new PipePluginInfo();
- pipeTaskInfo = new PipeTaskInfo();
+ pipeTaskInfo = new PipeTaskInfo(pipeUserCurrentPasswordProvider);
}
public PipePluginInfo getPipePluginInfo() {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 3c2e331b0a1..247f803152b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.persistence.pipe;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
@@ -56,7 +57,6 @@ import
org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceM
import
org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleMetaChangeProcedure;
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
-import org.apache.iotdb.confignode.service.ConfigNode;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -76,9 +76,11 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -95,10 +97,17 @@ public class PipeTaskInfo implements SnapshotProcessor {
// Pure in-memory object, not involved in snapshot serialization and
deserialization.
private final PipeTaskInfoVersion pipeTaskInfoVersion;
+ // Accepts a username and returns its current stored password for pipe
authentication.
+ private final Function<String, String> pipeUserCurrentPasswordProvider;
public PipeTaskInfo() {
+ this(null);
+ }
+
+ public PipeTaskInfo(final Function<String, String>
pipeUserCurrentPasswordProvider) {
this.pipeMetaKeeper = new PipeMetaKeeper();
this.pipeTaskInfoVersion = new PipeTaskInfoVersion();
+ this.pipeUserCurrentPasswordProvider = pipeUserCurrentPasswordProvider;
}
/////////////////////////////// Lock ///////////////////////////////
@@ -445,6 +454,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
public TSStatus createPipe(final CreatePipePlanV2 plan) {
acquireWriteLock();
try {
+ enrichPipeMetaWithRootUserForCompatibility(plan.getPipeStaticMeta());
pipeMetaKeeper.addPipeMeta(new PipeMeta(plan.getPipeStaticMeta(),
plan.getPipeRuntimeMeta()));
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} finally {
@@ -502,6 +512,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
public TSStatus alterPipe(final AlterPipePlanV2 plan) {
acquireWriteLock();
try {
+ enrichPipeMetaWithRootUserForCompatibility(plan.getPipeStaticMeta());
final PipeTemporaryMeta temporaryMeta =
pipeMetaKeeper.getPipeMeta(plan.getPipeStaticMeta().getPipeName()).getTemporaryMeta();
pipeMetaKeeper.removePipeMeta(plan.getPipeStaticMeta().getPipeName());
@@ -719,6 +730,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
plan.getPipeMetaList()
.forEach(
pipeMeta -> {
+
enrichPipeMetaWithRootUserForCompatibility(pipeMeta.getStaticMeta());
pipeMetaKeeper.addPipeMeta(pipeMeta);
logger.ifPresent(l ->
l.debug(ConfigNodeMessages.RECORDING_PIPE_META, pipeMeta));
});
@@ -998,6 +1010,47 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
+ public void enrichPipeMetasWithRootUserForCompatibility() {
+ acquireWriteLock();
+ try {
+ pipeMetaKeeper
+ .getPipeMetaList()
+ .forEach(
+ pipeMeta ->
enrichPipeMetaWithRootUserForCompatibility(pipeMeta.getStaticMeta()));
+ } finally {
+ releaseWriteLock();
+ }
+ }
+
+ private void enrichPipeMetaWithRootUserForCompatibility(final PipeStaticMeta
pipeStaticMeta) {
+ if (pipeUserCurrentPasswordProvider == null) {
+ return;
+ }
+ final boolean shouldEnrichSource =
pipeStaticMeta.mayNeedCompatibleRootUserForIoTDBSource();
+ final boolean shouldEnrichSink =
pipeStaticMeta.mayNeedCompatibleRootUserForWriteBackSink();
+ if (!shouldEnrichSource && !shouldEnrichSink) {
+ return;
+ }
+
+ final String rootUserName =
CommonDescriptor.getInstance().getConfig().getDefaultAdminName();
+ final String password =
pipeUserCurrentPasswordProvider.apply(rootUserName);
+ if (Objects.isNull(password)) {
+ throw new PipeException(
+ String.format(
+ ConfigNodeMessages
+
.FAILED_TO_ENRICH_PIPE_WITH_ROOT_USER_FOR_COMPATIBILITY_BECAUSE_ROOT_USER_DOES_NOT_EXIST,
+ pipeStaticMeta.getPipeName(),
+ rootUserName));
+ }
+
+ if (shouldEnrichSource) {
+ pipeStaticMeta.enrichSourceWithRootUserForCompatibility(rootUserName,
password);
+ }
+ if (shouldEnrichSink) {
+
pipeStaticMeta.enrichWriteBackSinkWithRootUserForCompatibility(rootUserName,
password);
+ }
+ }
+
private void normalizeRecoveredConsensusPipeStatus() {
final List<String> restartedConsensusPipes = new ArrayList<>();
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java
index 762c3bf045c..7b78f59253d 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java
@@ -20,11 +20,15 @@
package org.apache.iotdb.confignode.persistence.pipe;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
+import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
@@ -88,6 +92,145 @@ public class PipeTaskInfoAutoRestartTest {
Assert.assertEquals(PipeStatus.STOPPED, runtimeMeta.getStatus().get());
}
+ @Test
+ public void testEnrichOldUserPipeWithRootUserForCompatibility() {
+ final String rootUserName =
CommonDescriptor.getInstance().getConfig().getDefaultAdminName();
+ final String rootPassword = "root-current-password";
+ pipeTaskInfo = new PipeTaskInfo(username -> rootPassword);
+
+ createPipe("oldPipe", PipeStatus.STOPPED);
+
+ final Map<String, String> sourceAttributes =
+ pipeTaskInfo
+ .getPipeMetaByPipeName("oldPipe")
+ .getStaticMeta()
+ .getSourceParameters()
+ .getAttribute();
+ Assert.assertEquals(
+ String.valueOf(IoTDBConstant.SUPER_USER_ID),
+ sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_USER_ID));
+ Assert.assertEquals(
+ rootUserName,
sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY));
+ Assert.assertEquals(
+ rootPassword,
sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY));
+ Assert.assertFalse(
+ pipeTaskInfo
+ .getPipeMetaByPipeName("oldPipe")
+ .getStaticMeta()
+ .getSinkParameters()
+ .getAttribute()
+ .containsKey(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY));
+ }
+
+ @Test
+ public void testDoNotOverwritePipeWithUserForCompatibility() {
+ pipeTaskInfo = new PipeTaskInfo(username -> "root-current-password");
+
+ createPipeWithSourceAttributes(
+ "newPipe",
+ new HashMap<String, String>() {
+ {
+ put("extractor", "iotdb-source");
+ put(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, "user");
+ put(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY, "user-password");
+ }
+ });
+
+ final Map<String, String> sourceAttributes =
+ pipeTaskInfo
+ .getPipeMetaByPipeName("newPipe")
+ .getStaticMeta()
+ .getSourceParameters()
+ .getAttribute();
+ Assert.assertEquals("user",
sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY));
+ Assert.assertEquals(
+ "user-password",
sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY));
+ }
+
+ @Test
+ public void testDoNotEnrichSystemPipeForCompatibility() {
+ pipeTaskInfo = new PipeTaskInfo(username -> "root-current-password");
+
+ createPipeWithSourceAttributes(
+ PipeStaticMeta.generateSubscriptionPipeName("topic", "group"),
+ new HashMap<String, String>() {
+ {
+ put("extractor", "iotdb-source");
+ }
+ });
+
+ final Map<String, String> sourceAttributes =
+ pipeTaskInfo
+
.getPipeMetaByPipeName(PipeStaticMeta.generateSubscriptionPipeName("topic",
"group"))
+ .getStaticMeta()
+ .getSourceParameters()
+ .getAttribute();
+
Assert.assertFalse(sourceAttributes.containsKey(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY));
+
Assert.assertFalse(sourceAttributes.containsKey(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY));
+ }
+
+ @Test
+ public void testEnrichOldWriteBackSinkWithRootUserForCompatibility() {
+ final String rootUserName =
CommonDescriptor.getInstance().getConfig().getDefaultAdminName();
+ final String rootPassword = "root-current-password";
+ pipeTaskInfo = new PipeTaskInfo(username -> rootPassword);
+
+ createPipeWithAttributes(
+ "oldWriteBackPipe",
+ new HashMap<String, String>() {
+ {
+ put("extractor", "iotdb-source");
+ put(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, "source-user");
+ put(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY,
"source-password");
+ }
+ },
+ new HashMap<String, String>() {
+ {
+ put("connector", "write-back-sink");
+ }
+ });
+
+ final Map<String, String> sinkAttributes =
+ pipeTaskInfo
+ .getPipeMetaByPipeName("oldWriteBackPipe")
+ .getStaticMeta()
+ .getSinkParameters()
+ .getAttribute();
+ Assert.assertEquals(
+ String.valueOf(IoTDBConstant.SUPER_USER_ID),
+ sinkAttributes.get(PipeSinkConstant.SINK_IOTDB_USER_ID));
+ Assert.assertEquals(rootUserName,
sinkAttributes.get(PipeSinkConstant.SINK_IOTDB_USERNAME_KEY));
+ Assert.assertEquals(rootPassword,
sinkAttributes.get(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY));
+ }
+
+ @Test
+ public void testEnrichLoadedPipeMetasWithRootUserForCompatibility() {
+ final String rootPassword = "root-current-password";
+ pipeTaskInfo = new PipeTaskInfo(username -> rootPassword);
+
+ createPipeWithSourceAttributes(
+ "loadedPipe",
+ new HashMap<String, String>() {
+ {
+ put("extractor", "iotdb-source");
+ }
+ });
+ final Map<String, String> sourceAttributes =
+ pipeTaskInfo
+ .getPipeMetaByPipeName("loadedPipe")
+ .getStaticMeta()
+ .getSourceParameters()
+ .getAttribute();
+ sourceAttributes.remove(PipeSourceConstant.SOURCE_IOTDB_USER_ID);
+ sourceAttributes.remove(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY);
+ sourceAttributes.remove(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY);
+
+ pipeTaskInfo.enrichPipeMetasWithRootUserForCompatibility();
+
+ Assert.assertEquals(
+ rootPassword,
sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY));
+ }
+
private Map<Integer, TPushPipeMetaResp> createErrorRespMap(final String
pipeName) {
final TPushPipeMetaRespExceptionMessage exceptionMessage =
new TPushPipeMetaRespExceptionMessage(
@@ -101,11 +244,27 @@ public class PipeTaskInfoAutoRestartTest {
private void createPipe(final String pipeName, final PipeStatus
initialStatus) {
final Map<String, String> extractorAttributes = new HashMap<>();
- final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
extractorAttributes.put("extractor", "iotdb-source");
- processorAttributes.put("processor", "do-nothing-processor");
+ createPipeWithSourceAttributes(pipeName, extractorAttributes);
+
+ if (PipeStatus.RUNNING.equals(initialStatus)) {
+ pipeTaskInfo.setPipeStatus(new SetPipeStatusPlanV2(pipeName,
PipeStatus.RUNNING));
+ }
+ }
+
+ private void createPipeWithSourceAttributes(
+ final String pipeName, final Map<String, String> extractorAttributes) {
+ final Map<String, String> connectorAttributes = new HashMap<>();
connectorAttributes.put("connector", "iotdb-thrift-sink");
+ createPipeWithAttributes(pipeName, extractorAttributes,
connectorAttributes);
+ }
+
+ private void createPipeWithAttributes(
+ final String pipeName,
+ final Map<String, String> extractorAttributes,
+ final Map<String, String> connectorAttributes) {
+ final Map<String, String> processorAttributes = new HashMap<>();
+ processorAttributes.put("processor", "do-nothing-processor");
final PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID);
final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new
ConcurrentHashMap<>();
@@ -120,9 +279,5 @@ public class PipeTaskInfoAutoRestartTest {
connectorAttributes);
final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
pipeTaskInfo.createPipe(new CreatePipePlanV2(pipeStaticMeta,
pipeRuntimeMeta));
-
- if (PipeStatus.RUNNING.equals(initialStatus)) {
- pipeTaskInfo.setPipeStatus(new SetPipeStatusPlanV2(pipeName,
PipeStatus.RUNNING));
- }
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java
index 4b57cd64e43..9855552113c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.commons.pipe.agent.task.meta;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility;
import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils;
@@ -97,6 +99,64 @@ public class PipeStaticMeta {
.toLowerCase());
}
+ public boolean mayNeedCompatibleRootUserForIoTDBSource() {
+ final String pluginName =
+ sourceParameters
+ .getStringOrDefault(
+ Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY,
PipeSourceConstant.SOURCE_KEY),
+ BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+ .toLowerCase();
+
+ return PipeType.USER.equals(getPipeType())
+ &&
(pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+ ||
pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName()))
+ && !sourceParameters.hasAnyAttributes(
+ PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY,
+ PipeSourceConstant.SOURCE_IOTDB_USER_KEY,
+ PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY,
+ PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY,
+ PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY,
+ PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY);
+ }
+
+ public boolean mayNeedCompatibleRootUserForWriteBackSink() {
+ final String pluginName =
+ sinkParameters
+ .getStringOrDefault(
+ Arrays.asList(PipeSinkConstant.CONNECTOR_KEY,
PipeSinkConstant.SINK_KEY),
+ BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName())
+ .toLowerCase();
+
+ return PipeType.USER.equals(getPipeType())
+ &&
(pluginName.equals(BuiltinPipePlugin.WRITE_BACK_CONNECTOR.getPipePluginName())
+ ||
pluginName.equals(BuiltinPipePlugin.WRITE_BACK_SINK.getPipePluginName()))
+ && !sinkParameters.hasAnyAttributes(
+ PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY,
+ PipeSinkConstant.SINK_IOTDB_USER_KEY,
+ PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY,
+ PipeSinkConstant.SINK_IOTDB_USERNAME_KEY,
+ PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY,
+ PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY);
+ }
+
+ public void enrichSourceWithRootUserForCompatibility(
+ final String rootUserName, final String password) {
+ sourceParameters
+ .getAttribute()
+ .put(PipeSourceConstant.SOURCE_IOTDB_USER_ID,
String.valueOf(IoTDBConstant.SUPER_USER_ID));
+
sourceParameters.getAttribute().put(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY,
rootUserName);
+
sourceParameters.getAttribute().put(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY,
password);
+ }
+
+ public void enrichWriteBackSinkWithRootUserForCompatibility(
+ final String rootUserName, final String password) {
+ sinkParameters
+ .getAttribute()
+ .put(PipeSinkConstant.SINK_IOTDB_USER_ID,
String.valueOf(IoTDBConstant.SUPER_USER_ID));
+
sinkParameters.getAttribute().put(PipeSinkConstant.SINK_IOTDB_USERNAME_KEY,
rootUserName);
+
sinkParameters.getAttribute().put(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY,
password);
+ }
+
public ByteBuffer serialize() throws IOException {
PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);