This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 935b9a8f00e Pipe: Fixed the config meta transfer problem (#16280)
935b9a8f00e is described below
commit 935b9a8f00ed739e31f71855326e4d81a29b474c
Author: Caideyipi <[email protected]>
AuthorDate: Wed Aug 27 14:19:22 2025 +0800
Pipe: Fixed the config meta transfer problem (#16280)
* table-fix
* refactor
* fix
---
.../response/pipe/task/PipeTableResp.java | 11 ++--
.../agent/runtime/PipeConfigNodeRuntimeAgent.java | 11 +---
.../pipe/agent/task/PipeConfigNodeSubtask.java | 51 ++++++++---------
.../pipe/agent/task/PipeConfigNodeTaskAgent.java | 11 +---
.../pipe/agent/task/PipeConfigNodeTaskBuilder.java | 6 +-
.../PipeConfigPhysicalPlanTSStatusVisitor.java | 1 +
.../pipe/source/IoTDBConfigRegionSource.java | 2 +-
.../confignode/persistence/pipe/PipeInfo.java | 10 ++--
.../confignode/persistence/pipe/PipeTaskInfo.java | 16 +++---
.../impl/pipe/AbstractOperatePipeProcedureV2.java | 4 +-
.../impl/pipe/runtime/PipeMetaSyncProcedure.java | 4 +-
.../impl/pipe/task/CreatePipeProcedureV2.java | 4 +-
.../pipe/util/PipeExternalSourceLoadBalancer.java | 2 +-
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 19 +++----
.../agent/task/builder/PipeDataNodeBuilder.java | 2 +-
.../task/builder/PipeDataNodeTaskBuilder.java | 6 +-
.../pipe/agent/task/stage/PipeTaskSourceStage.java | 9 ++-
.../agent/task/subtask/sink/PipeSinkSubtask.java | 66 +++++++++++-----------
.../task/subtask/sink/PipeSinkSubtaskManager.java | 6 +-
...istoricalDataRegionTsFileAndDeletionSource.java | 6 +-
.../realtime/PipeRealtimeDataRegionSource.java | 6 +-
.../db/pipe/source/mqtt/MQTTPublishHandler.java | 4 +-
.../iotdb/db/pipe/source/mqtt/MQTTSource.java | 6 +-
.../config/executor/ClusterConfigTaskExecutor.java | 14 ++---
.../subtask/SubscriptionSinkSubtaskManager.java | 2 +-
.../agent/plugin/PipeDataNodePluginAgentTest.java | 4 +-
.../db/pipe/consensus/DeletionResourceTest.java | 4 +-
.../pattern/CachedSchemaPatternMatcherTest.java | 9 ++-
.../db/pipe/source/PipeRealtimeExtractTest.java | 10 ++--
.../commons/pipe/agent/plugin/PipePluginAgent.java | 8 +--
.../commons/pipe/agent/task/PipeTaskAgent.java | 14 ++---
.../pipe/agent/task/meta/PipeStaticMeta.java | 58 +++++++++----------
.../task/subtask/PipeAbstractSinkSubtask.java | 16 +++---
....java => PipeTaskSourceRuntimeEnvironment.java} | 9 ++-
.../iotdb/commons/pipe/source/IoTDBSource.java | 6 +-
35 files changed, 199 insertions(+), 218 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
index 37c3d1bd03b..ec596588b1e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
@@ -77,7 +77,7 @@ public class PipeTableResp implements DataSet {
allPipeMeta.stream()
.filter(pipeMeta ->
pipeMeta.getStaticMeta().getPipeName().equals(pipeName))
.findFirst()
- .map(pipeMeta ->
pipeMeta.getStaticMeta().getConnectorParameters().toString())
+ .map(pipeMeta ->
pipeMeta.getStaticMeta().getSinkParameters().toString())
.orElse(null);
return new PipeTableResp(
@@ -87,7 +87,7 @@ public class PipeTableResp implements DataSet {
pipeMeta ->
pipeMeta
.getStaticMeta()
- .getConnectorParameters()
+ .getSinkParameters()
.toString()
.equals(sortedConnectorParametersString))
.collect(Collectors.toList()));
@@ -177,10 +177,9 @@ public class PipeTableResp implements DataSet {
staticMeta.getPipeName(),
staticMeta.getCreationTime(),
runtimeMeta.getStatus().get().name(),
-
SystemConstant.addSystemKeysIfNecessary(staticMeta.getExtractorParameters())
- .toString(),
+
SystemConstant.addSystemKeysIfNecessary(staticMeta.getSourceParameters()).toString(),
staticMeta.getProcessorParameters().toString(),
- staticMeta.getConnectorParameters().toString(),
+ staticMeta.getSinkParameters().toString(),
exceptionMessageBuilder.toString());
final PipeTemporaryMetaInCoordinator temporaryMeta =
(PipeTemporaryMetaInCoordinator) pipeMeta.getTemporaryMeta();
@@ -206,7 +205,7 @@ public class PipeTableResp implements DataSet {
.getRegisteredDataNodeCount()
== 1
&& ConfigRegionListeningFilter.parseListeningPlanTypeSet(
- pipeMeta.getStaticMeta().getExtractorParameters())
+ pipeMeta.getStaticMeta().getSourceParameters())
.isEmpty();
} catch (final IllegalPathException e) {
return false;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
index 372b0334a24..e8cc16a1edf 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.confignode.manager.pipe.agent.runtime;
import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
import
org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalPhantomReferenceCleaner;
@@ -156,18 +155,12 @@ public class PipeConfigNodeRuntimeAgent implements
IService {
pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
- // Stop all pipes locally if critical exception occurs
- if (pipeRuntimeException instanceof PipeRuntimeCriticalException) {
- PipeConfigNodeAgent.task().stopAllPipesWithCriticalException();
- }
+ // Do not call "stopAllPipesWithCriticalException" because the sinks are
not reused in
+ // ConfigNodeSubtask
}
/////////////////////////// Periodical Job Executor
///////////////////////////
- public void registerPeriodicalJob(String id, Runnable periodicalJob, long
intervalInSeconds) {
- pipePeriodicalJobExecutor.register(id, periodicalJob, intervalInSeconds);
- }
-
public void registerPhantomReferenceCleanJob(
String id, Runnable periodicalJob, long intervalInSeconds) {
pipePeriodicalPhantomReferenceCleaner.register(id, periodicalJob,
intervalInSeconds);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
index 0d6d3fea9d6..2403e353b56 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
@@ -26,9 +26,9 @@ import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
-import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment;
+import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
@@ -56,7 +56,7 @@ public class PipeConfigNodeSubtask extends
PipeAbstractSinkSubtask {
private final PipeTaskMeta pipeTaskMeta;
// Pipe plugins for this subtask
- private PipeExtractor extractor;
+ private PipeExtractor source;
// TODO: currently unused
@SuppressWarnings("unused")
@@ -65,9 +65,9 @@ public class PipeConfigNodeSubtask extends
PipeAbstractSinkSubtask {
public PipeConfigNodeSubtask(
final String pipeName,
final long creationTime,
- final Map<String, String> extractorAttributes,
+ final Map<String, String> sourceAttributes,
final Map<String, String> processorAttributes,
- final Map<String, String> connectorAttributes,
+ final Map<String, String> sinkAttributes,
final PipeTaskMeta pipeTaskMeta)
throws Exception {
// We initialize outputPipeConnector by initConnector()
@@ -75,34 +75,34 @@ public class PipeConfigNodeSubtask extends
PipeAbstractSinkSubtask {
this.pipeName = pipeName;
this.pipeTaskMeta = pipeTaskMeta;
- initExtractor(extractorAttributes);
+ initSource(sourceAttributes);
initProcessor(processorAttributes);
- initConnector(connectorAttributes);
+ initSink(sinkAttributes);
PipeConfigRegionSinkMetrics.getInstance().register(this);
PipeEventCommitManager.getInstance()
.register(pipeName, creationTime, CONFIG_REGION_ID.getId(), pipeName +
"_" + creationTime);
}
- private void initExtractor(final Map<String, String> extractorAttributes)
throws Exception {
- final PipeParameters extractorParameters = new
PipeParameters(extractorAttributes);
+ private void initSource(final Map<String, String> sourceAttributes) throws
Exception {
+ final PipeParameters sourceParameters = new
PipeParameters(sourceAttributes);
// 1. Construct extractor
- extractor =
PipeConfigNodeAgent.plugin().reflectExtractor(extractorParameters);
+ source = PipeConfigNodeAgent.plugin().reflectSource(sourceParameters);
try {
// 2. Validate extractor parameters
- extractor.validate(new PipeParameterValidator(extractorParameters));
+ source.validate(new PipeParameterValidator(sourceParameters));
// 3. Customize extractor
final PipeTaskRuntimeConfiguration runtimeConfiguration =
new PipeTaskRuntimeConfiguration(
- new PipeTaskExtractorRuntimeEnvironment(
+ new PipeTaskSourceRuntimeEnvironment(
pipeName, creationTime, CONFIG_REGION_ID.getId(),
pipeTaskMeta));
- extractor.customize(extractorParameters, runtimeConfiguration);
+ source.customize(sourceParameters, runtimeConfiguration);
} catch (final Exception e) {
try {
- extractor.close();
+ source.close();
} catch (Exception closeException) {
LOGGER.warn(
"Failed to close extractor after failed to initialize extractor. "
@@ -131,31 +131,30 @@ public class PipeConfigNodeSubtask extends
PipeAbstractSinkSubtask {
runtimeConfiguration);
}
- private void initConnector(final Map<String, String> connectorAttributes)
throws Exception {
- final PipeParameters connectorParameters = new
PipeParameters(connectorAttributes);
+ private void initSink(final Map<String, String> sinkAttributes) throws
Exception {
+ final PipeParameters sinkParameters = new PipeParameters(sinkAttributes);
// 1. Construct connector
- outputPipeConnector =
PipeConfigNodeAgent.plugin().reflectConnector(connectorParameters);
+ outputPipeSink = PipeConfigNodeAgent.plugin().reflectSink(sinkParameters);
try {
// 2. Validate connector parameters
- outputPipeConnector.validate(new
PipeParameterValidator(connectorParameters));
+ outputPipeSink.validate(new PipeParameterValidator(sinkParameters));
// 3. Customize connector
final PipeTaskRuntimeConfiguration runtimeConfiguration =
new PipeTaskRuntimeConfiguration(
new PipeTaskSinkRuntimeEnvironment(pipeName, creationTime,
CONFIG_REGION_ID.getId()));
- outputPipeConnector.customize(connectorParameters, runtimeConfiguration);
+ outputPipeSink.customize(sinkParameters, runtimeConfiguration);
// 4. Handshake
- outputPipeConnector.handshake();
+ outputPipeSink.handshake();
} catch (final Exception e) {
try {
- outputPipeConnector.close();
+ outputPipeSink.close();
} catch (final Exception closeException) {
LOGGER.warn(
- "Failed to close connector after failed to initialize connector. "
- + "Ignore this exception.",
+ "Failed to close sink after failed to initialize it. Ignore this
exception.",
closeException);
}
throw e;
@@ -175,7 +174,7 @@ public class PipeConfigNodeSubtask extends
PipeAbstractSinkSubtask {
return false;
}
- final Event event = lastEvent != null ? lastEvent : extractor.supply();
+ final Event event = lastEvent != null ? lastEvent : source.supply();
// Record the last event for retry when exception occurs
setLastEvent(event);
@@ -185,7 +184,7 @@ public class PipeConfigNodeSubtask extends
PipeAbstractSinkSubtask {
}
if (!(event instanceof ProgressReportEvent)) {
- outputPipeConnector.transfer(event);
+ outputPipeSink.transfer(event);
PipeConfigRegionSinkMetrics.getInstance().markConfigEvent(taskID);
}
decreaseReferenceCountAndReleaseLastEvent(event, true);
@@ -226,7 +225,7 @@ public class PipeConfigNodeSubtask extends
PipeAbstractSinkSubtask {
PipeConfigRegionSinkMetrics.getInstance().deregister(taskID);
try {
- extractor.close();
+ source.close();
} catch (final Exception e) {
LOGGER.info("Error occurred during closing PipeExtractor.", e);
}
@@ -238,7 +237,7 @@ public class PipeConfigNodeSubtask extends
PipeAbstractSinkSubtask {
}
try {
- outputPipeConnector.close();
+ outputPipeSink.close();
} catch (final Exception e) {
LOGGER.info("Error occurred during closing PipeConnector.", e);
} finally {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
index 3f66d8aa7c0..4f90d147e26 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
@@ -91,16 +91,16 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
&& pipeTaskMeta.getLeaderNodeId()
== ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()
&& !ConfigRegionListeningFilter.parseListeningPlanTypeSet(
- pipeStaticMeta.getExtractorParameters())
+ pipeStaticMeta.getSourceParameters())
.isEmpty()) {
final PipeConfigNodeTask pipeTask =
new PipeConfigNodeTask(
new PipeConfigNodeTaskStage(
pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime(),
- pipeStaticMeta.getExtractorParameters().getAttribute(),
+ pipeStaticMeta.getSourceParameters().getAttribute(),
pipeStaticMeta.getProcessorParameters().getAttribute(),
- pipeStaticMeta.getConnectorParameters().getAttribute(),
+ pipeStaticMeta.getSinkParameters().getAttribute(),
pipeTaskMeta));
pipeTask.create();
pipeTaskManager.addPipeTask(pipeStaticMeta, consensusGroupId, pipeTask);
@@ -113,11 +113,6 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent
{
.put(consensusGroupId, pipeTaskMeta);
}
- public void stopAllPipesWithCriticalException() {
- super.stopAllPipesWithCriticalException(
- ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId());
- }
-
@Override
protected TPushPipeMetaRespExceptionMessage
handleSinglePipeMetaChangesInternal(
final PipeMeta pipeMetaFromCoordinator) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskBuilder.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskBuilder.java
index bab8badcb59..bdae3f0f07a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskBuilder.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskBuilder.java
@@ -52,7 +52,7 @@ public class PipeConfigNodeTaskBuilder {
&& consensusGroupIdToPipeTaskMeta.getValue().getLeaderNodeId()
== ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()
&& !ConfigRegionListeningFilter.parseListeningPlanTypeSet(
- pipeStaticMeta.getExtractorParameters())
+ pipeStaticMeta.getSourceParameters())
.isEmpty()) {
consensusGroupIdToPipeTaskMap.put(
consensusGroupId,
@@ -60,9 +60,9 @@ public class PipeConfigNodeTaskBuilder {
new PipeConfigNodeTaskStage(
pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime(),
- pipeStaticMeta.getExtractorParameters().getAttribute(),
+ pipeStaticMeta.getSourceParameters().getAttribute(),
pipeStaticMeta.getProcessorParameters().getAttribute(),
- pipeStaticMeta.getConnectorParameters().getAttribute(),
+ pipeStaticMeta.getSinkParameters().getAttribute(),
consensusGroupIdToPipeTaskMeta.getValue())));
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
index 3f92e05f33b..e1b6ce7c707 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
@@ -566,6 +566,7 @@ public class PipeConfigPhysicalPlanTSStatusVisitor
private TSStatus visitCommonTablePlan(final ConfigPhysicalPlan plan, final
TSStatus context) {
if (context.getCode() == TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()
+ || context.getCode() ==
TSStatusCode.TABLE_ALREADY_EXISTS.getStatusCode()
|| context.getCode() == TSStatusCode.TABLE_NOT_EXISTS.getStatusCode()
|| context.getCode() ==
TSStatusCode.COLUMN_ALREADY_EXISTS.getStatusCode()
|| context.getCode() ==
TSStatusCode.COLUMN_NOT_EXISTS.getStatusCode()) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
index 685e7b08451..ef1e08f8536 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
@@ -91,7 +91,7 @@ public class IoTDBConfigRegionSource extends
IoTDBNonDataRegionSource {
.getConfigNodeConsensusProtocolClass()
.equals(ConsensusFactory.SIMPLE_CONSENSUS)) {
throw new PipeException(
- "IoTDBConfigRegionExtractor does not transferring events under
simple consensus");
+ "IoTDBConfigRegionSource does not transferring events under simple
consensus");
}
super.customize(parameters, configuration);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
index 409f436bdf4..0861bed200b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
@@ -88,7 +88,7 @@ public class PipeInfo implements SnapshotProcessor {
() -> {
try {
PipeConfigNodeAgent.runtime()
-
.increaseListenerReference(plan.getPipeStaticMeta().getExtractorParameters());
+
.increaseListenerReference(plan.getPipeStaticMeta().getSourceParameters());
return null;
} catch (final Exception e) {
throw new PipeException("Failed to increase listener
reference", e);
@@ -138,7 +138,7 @@ public class PipeInfo implements SnapshotProcessor {
meta -> {
try {
PipeConfigNodeAgent.runtime()
-
.decreaseListenerReference(meta.getStaticMeta().getExtractorParameters());
+
.decreaseListenerReference(meta.getStaticMeta().getSourceParameters());
} catch (final Exception e) {
throw new PipeException("Failed to decrease listener
reference", e);
}
@@ -171,12 +171,12 @@ public class PipeInfo implements SnapshotProcessor {
pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeStaticMeta().getPipeName()));
if (message == null) {
PipeConfigNodeAgent.runtime()
-
.increaseListenerReference(plan.getPipeStaticMeta().getExtractorParameters());
+
.increaseListenerReference(plan.getPipeStaticMeta().getSourceParameters());
pipeMetaBeforeAlter.ifPresent(
meta -> {
try {
PipeConfigNodeAgent.runtime()
-
.decreaseListenerReference(meta.getStaticMeta().getExtractorParameters());
+
.decreaseListenerReference(meta.getStaticMeta().getSourceParameters());
} catch (final Exception e) {
throw new PipeException("Failed to decrease listener
reference", e);
}
@@ -273,7 +273,7 @@ public class PipeInfo implements SnapshotProcessor {
for (final PipeMeta pipeMeta : pipeTaskInfo.getPipeMetaList()) {
PipeConfigNodeAgent.runtime()
-
.increaseListenerReference(pipeMeta.getStaticMeta().getExtractorParameters());
+
.increaseListenerReference(pipeMeta.getStaticMeta().getSourceParameters());
}
} catch (final Exception ex) {
LOGGER.error("Failed to load pipe task info from snapshot", ex);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 93cf882b5b1..6aadee63719 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -214,9 +214,9 @@ public class PipeTaskInfo implements SnapshotProcessor {
new PipeStaticMeta(
pipeStaticMetaFromCoordinator.getPipeName(),
pipeStaticMetaFromCoordinator.getCreationTime(),
- new
HashMap<>(pipeStaticMetaFromCoordinator.getExtractorParameters().getAttribute()),
+ new
HashMap<>(pipeStaticMetaFromCoordinator.getSourceParameters().getAttribute()),
new
HashMap<>(pipeStaticMetaFromCoordinator.getProcessorParameters().getAttribute()),
- new
HashMap<>(pipeStaticMetaFromCoordinator.getConnectorParameters().getAttribute()));
+ new
HashMap<>(pipeStaticMetaFromCoordinator.getSinkParameters().getAttribute()));
// 1. In modify mode, based on the passed attributes:
// 1.1. if they are empty, the original attributes are filled directly.
@@ -225,11 +225,11 @@ public class PipeTaskInfo implements SnapshotProcessor {
if (!alterPipeRequest.isReplaceAllExtractorAttributes) { // modify mode
if (alterPipeRequest.getExtractorAttributes().isEmpty()) {
alterPipeRequest.setExtractorAttributes(
-
copiedPipeStaticMetaFromCoordinator.getExtractorParameters().getAttribute());
+
copiedPipeStaticMetaFromCoordinator.getSourceParameters().getAttribute());
} else {
alterPipeRequest.setExtractorAttributes(
copiedPipeStaticMetaFromCoordinator
- .getExtractorParameters()
+ .getSourceParameters()
.addOrReplaceEquivalentAttributes(
new
PipeParameters(alterPipeRequest.getExtractorAttributes()))
.getAttribute());
@@ -253,11 +253,11 @@ public class PipeTaskInfo implements SnapshotProcessor {
if (!alterPipeRequest.isReplaceAllConnectorAttributes) { // modify mode
if (alterPipeRequest.getConnectorAttributes().isEmpty()) {
alterPipeRequest.setConnectorAttributes(
-
copiedPipeStaticMetaFromCoordinator.getConnectorParameters().getAttribute());
+
copiedPipeStaticMetaFromCoordinator.getSinkParameters().getAttribute());
} else {
alterPipeRequest.setConnectorAttributes(
copiedPipeStaticMetaFromCoordinator
- .getConnectorParameters()
+ .getSinkParameters()
.addOrReplaceEquivalentAttributes(
new
PipeParameters(alterPipeRequest.getConnectorAttributes()))
.getAttribute());
@@ -396,7 +396,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
private void validatePipePluginUsageByPipeInternal(String pluginName) {
Iterable<PipeMeta> pipeMetas = getPipeMetaList();
for (PipeMeta pipeMeta : pipeMetas) {
- PipeParameters extractorParameters =
pipeMeta.getStaticMeta().getExtractorParameters();
+ PipeParameters extractorParameters =
pipeMeta.getStaticMeta().getSourceParameters();
final String extractorPluginName =
extractorParameters.getStringOrDefault(
Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY,
PipeSourceConstant.SOURCE_KEY),
@@ -420,7 +420,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
throw new PipeException(exceptionMessage);
}
- PipeParameters connectorParameters =
pipeMeta.getStaticMeta().getConnectorParameters();
+ PipeParameters connectorParameters =
pipeMeta.getStaticMeta().getSinkParameters();
final String connectorPluginName =
connectorParameters.getStringOrDefault(
Arrays.asList(PipeSinkConstant.CONNECTOR_KEY,
PipeSinkConstant.SINK_KEY),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index bd343412660..c48c450157c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -599,9 +599,7 @@ public abstract class AbstractOperatePipeProcedureV2
try {
return !DataRegionListeningFilter.shouldDatabaseBeListened(
- copiedPipeMeta.getStaticMeta().getExtractorParameters(),
- isTableModel,
- database);
+ copiedPipeMeta.getStaticMeta().getSourceParameters(),
isTableModel, database);
} catch (final Exception e) {
return false;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index ab67185dacd..20e23a64701 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -127,7 +127,7 @@ public class PipeMetaSyncProcedure extends
AbstractOperatePipeProcedureV2 {
new PipeExternalSourceLoadBalancer(
pipeMeta
.getStaticMeta()
- .getExtractorParameters()
+ .getSourceParameters()
.getStringOrDefault(
Arrays.asList(
PipeSourceConstant.EXTERNAL_EXTRACTOR_BALANCE_STRATEGY_KEY,
@@ -136,7 +136,7 @@ public class PipeMetaSyncProcedure extends
AbstractOperatePipeProcedureV2 {
final int parallelism =
pipeMeta
.getStaticMeta()
- .getExtractorParameters()
+ .getSourceParameters()
.getIntOrDefault(
Arrays.asList(
EXTERNAL_EXTRACTOR_PARALLELISM_KEY,
EXTERNAL_SOURCE_PARALLELISM_KEY),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 004a3ab47fb..2fd698112fc 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -276,7 +276,7 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
final PipeExternalSourceLoadBalancer loadBalancer =
new PipeExternalSourceLoadBalancer(
pipeStaticMeta
- .getExtractorParameters()
+ .getSourceParameters()
.getStringOrDefault(
Arrays.asList(
PipeSourceConstant.EXTERNAL_EXTRACTOR_BALANCE_STRATEGY_KEY,
@@ -284,7 +284,7 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
PipeSourceConstant.EXTERNAL_EXTRACTOR_BALANCE_PROPORTION_STRATEGY));
final int parallelism =
pipeStaticMeta
- .getExtractorParameters()
+ .getSourceParameters()
.getIntOrDefault(
Arrays.asList(
EXTERNAL_EXTRACTOR_PARALLELISM_KEY,
EXTERNAL_SOURCE_PARALLELISM_KEY),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/util/PipeExternalSourceLoadBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/util/PipeExternalSourceLoadBalancer.java
index 06215ba46d1..76756a579f0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/util/PipeExternalSourceLoadBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/util/PipeExternalSourceLoadBalancer.java
@@ -96,7 +96,7 @@ public class PipeExternalSourceLoadBalancer {
// evenly distributed across running DataNodes.
// 2. If no DataNodes are available, a PipeException is thrown.
if (pipeStaticMeta
- .getExtractorParameters()
+ .getSourceParameters()
.getBooleanOrDefault(
Arrays.asList(
PipeSourceConstant.EXTERNAL_EXTRACTOR_SINGLE_INSTANCE_PER_NODE_KEY,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index d15391fc226..a2503fec8a7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -171,7 +171,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
final PipeTaskMeta pipeTaskMeta)
throws IllegalPathException {
if (pipeTaskMeta.getLeaderNodeId() == CONFIG.getDataNodeId()) {
- final PipeParameters sourceParameters =
pipeStaticMeta.getExtractorParameters();
+ final PipeParameters sourceParameters =
pipeStaticMeta.getSourceParameters();
final DataRegionId dataRegionId = new DataRegionId(consensusGroupId);
final boolean needConstructDataRegionTask =
StorageEngine.getInstance().getAllDataRegionIds().contains(dataRegionId)
@@ -240,7 +240,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
// Check each pipe
for (final PipeMeta pipeMetaFromCoordinator : pipeMetaListFromCoordinator)
{
if (SchemaRegionListeningFilter.parseListeningPlanTypeSet(
- pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters())
+ pipeMetaFromCoordinator.getStaticMeta().getSourceParameters())
.isEmpty()) {
continue;
}
@@ -359,14 +359,11 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
// subscribed pipe, so the subscription needs to be manually marked as
completed.
if (!hasPipeTasks && PipeStaticMeta.isSubscriptionPipe(pipeName)) {
final String topicName =
- pipeMeta
- .getStaticMeta()
- .getConnectorParameters()
- .getString(PipeSinkConstant.SINK_TOPIC_KEY);
+
pipeMeta.getStaticMeta().getSinkParameters().getString(PipeSinkConstant.SINK_TOPIC_KEY);
final String consumerGroupId =
pipeMeta
.getStaticMeta()
- .getConnectorParameters()
+ .getSinkParameters()
.getString(PipeSinkConstant.SINK_CONSUMER_GROUP_KEY);
SubscriptionAgent.broker().updateCompletedTopicNames(consumerGroupId,
topicName);
}
@@ -430,14 +427,14 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
final String sourceModeValue =
pipeMeta
.getStaticMeta()
- .getExtractorParameters()
+ .getSourceParameters()
.getStringOrDefault(
Arrays.asList(
PipeSourceConstant.EXTRACTOR_MODE_KEY,
PipeSourceConstant.SOURCE_MODE_KEY),
PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE);
final boolean includeDataAndNeedDrop =
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
- pipeMeta.getStaticMeta().getExtractorParameters())
+ pipeMeta.getStaticMeta().getSourceParameters())
.getLeft()
&&
(sourceModeValue.equalsIgnoreCase(PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE)
|| sourceModeValue.equalsIgnoreCase(
@@ -512,9 +509,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
final boolean includeDataAndNeedDrop =
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
- pipeMeta.getStaticMeta().getExtractorParameters())
+ pipeMeta.getStaticMeta().getSourceParameters())
.getLeft()
- &&
isSnapshotMode(pipeMeta.getStaticMeta().getExtractorParameters());
+ &&
isSnapshotMode(pipeMeta.getStaticMeta().getSourceParameters());
final boolean isCompleted = isAllDataRegionCompleted &&
includeDataAndNeedDrop;
final Pair<Long, Double> remainingEventAndTime =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
index e39779769d5..b55704a3da4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
@@ -63,7 +63,7 @@ public class PipeDataNodeBuilder {
final PipeTaskMeta pipeTaskMeta =
consensusGroupIdToPipeTaskMeta.getValue();
if (pipeTaskMeta.getLeaderNodeId() == CONFIG.getDataNodeId()) {
- final PipeParameters extractorParameters =
pipeStaticMeta.getExtractorParameters();
+ final PipeParameters extractorParameters =
pipeStaticMeta.getSourceParameters();
final DataRegionId dataRegionId = new DataRegionId(consensusGroupId);
final boolean needConstructDataRegionTask =
dataRegionIds.contains(dataRegionId)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index 7f0b6b4ff18..bcaf58ed2b6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -85,9 +85,9 @@ public class PipeDataNodeTaskBuilder {
// Analyzes the PipeParameters to identify potential conflicts.
final PipeParameters extractorParameters =
- blendUserAndSystemParameters(pipeStaticMeta.getExtractorParameters());
+ blendUserAndSystemParameters(pipeStaticMeta.getSourceParameters());
final PipeParameters connectorParameters =
- blendUserAndSystemParameters(pipeStaticMeta.getConnectorParameters());
+ blendUserAndSystemParameters(pipeStaticMeta.getSinkParameters());
checkConflict(extractorParameters, connectorParameters);
injectParameters(extractorParameters, connectorParameters);
@@ -135,7 +135,7 @@ public class PipeDataNodeTaskBuilder {
PROCESSOR_EXECUTOR,
pipeTaskMeta,
pipeStaticMeta
- .getConnectorParameters()
+ .getSinkParameters()
.getStringOrDefault(
Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
CONNECTOR_FORMAT_HYBRID_VALUE)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
index fc3bd646549..57a804df0d3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
@@ -25,7 +25,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.stage.PipeTaskStage;
import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
-import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.pipe.api.PipeExtractor;
@@ -51,8 +51,8 @@ public class PipeTaskSourceStage extends PipeTaskStage {
pipeExtractor =
StorageEngine.getInstance().getAllDataRegionIds().contains(new
DataRegionId(regionId))
|| PipeRuntimeMeta.isSourceExternal(regionId)
- ?
PipeDataNodeAgent.plugin().dataRegion().reflectExtractor(extractorParameters)
- :
PipeDataNodeAgent.plugin().schemaRegion().reflectExtractor(extractorParameters);
+ ?
PipeDataNodeAgent.plugin().dataRegion().reflectSource(extractorParameters)
+ :
PipeDataNodeAgent.plugin().schemaRegion().reflectSource(extractorParameters);
// Validate and customize should be called before createSubtask. this
allows extractor exposing
// exceptions in advance.
@@ -63,8 +63,7 @@ public class PipeTaskSourceStage extends PipeTaskStage {
// 2. Customize extractor
final PipeTaskRuntimeConfiguration runtimeConfiguration =
new PipeTaskRuntimeConfiguration(
- new PipeTaskExtractorRuntimeEnvironment(
- pipeName, creationTime, regionId, pipeTaskMeta));
+ new PipeTaskSourceRuntimeEnvironment(pipeName, creationTime,
regionId, pipeTaskMeta));
pipeExtractor.customize(extractorParameters, runtimeConfiguration);
} catch (Exception e) {
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index acfa13c68c5..d3c94173848 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -108,13 +108,13 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
}
if (event instanceof TabletInsertionEvent) {
- outputPipeConnector.transfer((TabletInsertionEvent) event);
+ outputPipeSink.transfer((TabletInsertionEvent) event);
PipeDataRegionSinkMetrics.getInstance().markTabletEvent(taskID);
} else if (event instanceof TsFileInsertionEvent) {
- outputPipeConnector.transfer((TsFileInsertionEvent) event);
+ outputPipeSink.transfer((TsFileInsertionEvent) event);
PipeDataRegionSinkMetrics.getInstance().markTsFileEvent(taskID);
} else if (event instanceof PipeSchemaRegionWritePlanEvent) {
- outputPipeConnector.transfer(event);
+ outputPipeSink.transfer(event);
if (((PipeSchemaRegionWritePlanEvent) event).getPlanNode().getType()
!= PlanNodeType.DELETE_DATA) {
// Only plan nodes in schema region will be marked, delete data node
is currently not
@@ -124,7 +124,7 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
} else if (event instanceof PipeHeartbeatEvent) {
transferHeartbeatEvent((PipeHeartbeatEvent) event);
} else {
- outputPipeConnector.transfer(
+ outputPipeSink.transfer(
event instanceof UserDefinedEnrichedEvent
? ((UserDefinedEnrichedEvent) event).getUserDefinedEvent()
: event);
@@ -171,12 +171,12 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
}
try {
- outputPipeConnector.heartbeat();
- outputPipeConnector.transfer(event);
+ outputPipeSink.heartbeat();
+ outputPipeSink.transfer(event);
} catch (final Exception e) {
throw new PipeConnectionException(
"PipeConnector: "
- + outputPipeConnector.getClass().getName()
+ + outputPipeSink.getClass().getName()
+ "(id: "
+ taskID
+ ")"
@@ -200,11 +200,11 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
isClosed.set(true);
try {
final long startTime = System.currentTimeMillis();
- outputPipeConnector.close();
+ outputPipeSink.close();
LOGGER.info(
"Pipe: connector subtask {} ({}) was closed within {} ms",
taskID,
- outputPipeConnector,
+ outputPipeSink,
System.currentTimeMillis() - startTime);
} catch (final Exception e) {
LOGGER.info(
@@ -271,8 +271,8 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
decreaseHighPriorityTaskCount();
}
- if (outputPipeConnector instanceof IoTDBSink) {
- ((IoTDBSink) outputPipeConnector).discardEventsOfPipe(pipeNameToDrop,
regionId);
+ if (outputPipeSink instanceof IoTDBSink) {
+ ((IoTDBSink) outputPipeSink).discardEventsOfPipe(pipeNameToDrop,
regionId);
}
}
@@ -302,68 +302,68 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
}
public int getAsyncConnectorRetryEventQueueSize() {
- return outputPipeConnector instanceof IoTDBDataRegionAsyncSink
- ? ((IoTDBDataRegionAsyncSink)
outputPipeConnector).getRetryEventQueueSize()
+ return outputPipeSink instanceof IoTDBDataRegionAsyncSink
+ ? ((IoTDBDataRegionAsyncSink) outputPipeSink).getRetryEventQueueSize()
: 0;
}
public int getPendingHandlersSize() {
- return outputPipeConnector instanceof IoTDBDataRegionAsyncSink
- ? ((IoTDBDataRegionAsyncSink)
outputPipeConnector).getPendingHandlersSize()
+ return outputPipeSink instanceof IoTDBDataRegionAsyncSink
+ ? ((IoTDBDataRegionAsyncSink) outputPipeSink).getPendingHandlersSize()
: 0;
}
public int getBatchSize() {
- if (outputPipeConnector instanceof IoTDBDataRegionAsyncSink) {
- return ((IoTDBDataRegionAsyncSink) outputPipeConnector).getBatchSize();
+ if (outputPipeSink instanceof IoTDBDataRegionAsyncSink) {
+ return ((IoTDBDataRegionAsyncSink) outputPipeSink).getBatchSize();
}
- if (outputPipeConnector instanceof IoTDBDataRegionSyncSink) {
- return ((IoTDBDataRegionSyncSink) outputPipeConnector).getBatchSize();
+ if (outputPipeSink instanceof IoTDBDataRegionSyncSink) {
+ return ((IoTDBDataRegionSyncSink) outputPipeSink).getBatchSize();
}
return 0;
}
public double getTotalUncompressedSize() {
- return outputPipeConnector instanceof IoTDBSink
- ? ((IoTDBSink) outputPipeConnector).getTotalUncompressedSize()
+ return outputPipeSink instanceof IoTDBSink
+ ? ((IoTDBSink) outputPipeSink).getTotalUncompressedSize()
: 0;
}
public double getTotalCompressedSize() {
- return outputPipeConnector instanceof IoTDBSink
- ? ((IoTDBSink) outputPipeConnector).getTotalCompressedSize()
+ return outputPipeSink instanceof IoTDBSink
+ ? ((IoTDBSink) outputPipeSink).getTotalCompressedSize()
: 0;
}
public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
- if (outputPipeConnector instanceof IoTDBSink) {
- ((IoTDBSink)
outputPipeConnector).setTabletBatchSizeHistogram(tabletBatchSizeHistogram);
+ if (outputPipeSink instanceof IoTDBSink) {
+ ((IoTDBSink)
outputPipeSink).setTabletBatchSizeHistogram(tabletBatchSizeHistogram);
}
}
public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) {
- if (outputPipeConnector instanceof IoTDBSink) {
- ((IoTDBSink)
outputPipeConnector).setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
+ if (outputPipeSink instanceof IoTDBSink) {
+ ((IoTDBSink)
outputPipeSink).setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
}
}
public void setTabletBatchTimeIntervalHistogram(Histogram
tabletBatchTimeIntervalHistogram) {
- if (outputPipeConnector instanceof IoTDBSink) {
- ((IoTDBSink) outputPipeConnector)
+ if (outputPipeSink instanceof IoTDBSink) {
+ ((IoTDBSink) outputPipeSink)
.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram);
}
}
public void setTsFileBatchTimeIntervalHistogram(Histogram
tsFileBatchTimeIntervalHistogram) {
- if (outputPipeConnector instanceof IoTDBSink) {
- ((IoTDBSink) outputPipeConnector)
+ if (outputPipeSink instanceof IoTDBSink) {
+ ((IoTDBSink) outputPipeSink)
.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
}
}
public void setEventSizeHistogram(Histogram eventSizeHistogram) {
- if (outputPipeConnector instanceof IoTDBSink) {
- ((IoTDBSink)
outputPipeConnector).setBatchEventSizeHistogram(eventSizeHistogram);
+ if (outputPipeSink instanceof IoTDBSink) {
+ ((IoTDBSink)
outputPipeSink).setBatchEventSizeHistogram(eventSizeHistogram);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index e25cf6be67f..c2a5575e4ce 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -132,10 +132,8 @@ public class PipeSinkSubtaskManager {
for (int connectorIndex = 0; connectorIndex < connectorNum;
connectorIndex++) {
final PipeConnector pipeConnector =
isDataRegionConnector
- ?
PipeDataNodeAgent.plugin().dataRegion().reflectConnector(pipeConnectorParameters)
- : PipeDataNodeAgent.plugin()
- .schemaRegion()
- .reflectConnector(pipeConnectorParameters);
+ ?
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeConnectorParameters)
+ :
PipeDataNodeAgent.plugin().schemaRegion().reflectSink(pipeConnectorParameters);
// 1. Construct, validate and customize PipeConnector, and then
handshake (create
// connection) with the target
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index 1f5ded9fb5f..cedbea1a13c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -31,7 +31,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
-import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.datastructure.resource.PersistentResource;
@@ -310,8 +310,8 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
return;
}
- final PipeTaskExtractorRuntimeEnvironment environment =
- (PipeTaskExtractorRuntimeEnvironment)
configuration.getRuntimeEnvironment();
+ final PipeTaskSourceRuntimeEnvironment environment =
+ (PipeTaskSourceRuntimeEnvironment)
configuration.getRuntimeEnvironment();
pipeName = environment.getPipeName();
creationTime = environment.getCreationTime();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index ce99a182e0b..7c547e94407 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -25,7 +25,7 @@ import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
-import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -201,8 +201,8 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
public void customize(
final PipeParameters parameters, final PipeExtractorRuntimeConfiguration
configuration)
throws Exception {
- final PipeTaskExtractorRuntimeEnvironment environment =
- (PipeTaskExtractorRuntimeEnvironment)
configuration.getRuntimeEnvironment();
+ final PipeTaskSourceRuntimeEnvironment environment =
+ (PipeTaskSourceRuntimeEnvironment)
configuration.getRuntimeEnvironment();
final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTPublishHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTPublishHandler.java
index 9e42ede3194..f1eb102b32e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTPublishHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTPublishHandler.java
@@ -23,7 +23,7 @@ import
org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion;
import org.apache.iotdb.commons.exception.IllegalPathException;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
-import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import
org.apache.iotdb.db.pipe.event.common.statement.PipeStatementInsertionEvent;
@@ -77,7 +77,7 @@ public class MQTTPublishHandler extends
AbstractInterceptHandler {
public MQTTPublishHandler(
final PayloadFormatter payloadFormat,
- final PipeTaskExtractorRuntimeEnvironment environment,
+ final PipeTaskSourceRuntimeEnvironment environment,
final UnboundedBlockingPendingQueue<EnrichedEvent> pendingQueue) {
this.payloadFormat = payloadFormat;
useTableInsert =
PayloadFormatter.TABLE_TYPE.equals(this.payloadFormat.getType());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTSource.java
index a754fc536af..f4d289b0ef6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTSource.java
@@ -23,7 +23,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPend
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
-import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
import org.apache.iotdb.db.protocol.mqtt.BrokerAuthenticator;
@@ -165,8 +165,8 @@ public class MQTTSource implements PipeExtractor {
public void customize(
final PipeParameters parameters, final PipeExtractorRuntimeConfiguration
configuration)
throws Exception {
- final PipeTaskExtractorRuntimeEnvironment environment =
- (PipeTaskExtractorRuntimeEnvironment)
configuration.getRuntimeEnvironment();
+ final PipeTaskSourceRuntimeEnvironment environment =
+ (PipeTaskSourceRuntimeEnvironment)
configuration.getRuntimeEnvironment();
pipeName = environment.getPipeName();
creationTime = environment.getCreationTime();
pipeTaskMeta = environment.getPipeTaskMeta();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 9e57532ad26..4d3a607d61f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -2104,7 +2104,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
.containsKey(PipeSourceConstant.SOURCE_KEY)
|| alterPipeStatement.isReplaceAllExtractorAttributes()) {
checkIfSourcePluginChanged(
- pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters(),
+ pipeMetaFromCoordinator.getStaticMeta().getSourceParameters(),
new PipeParameters(alterPipeStatement.getExtractorAttributes()));
}
if (alterPipeStatement.isReplaceAllExtractorAttributes()) {
@@ -2114,18 +2114,18 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
onlyContainsUser(alterPipeStatement.getExtractorAttributes());
pipeMetaFromCoordinator
.getStaticMeta()
- .getExtractorParameters()
+ .getSourceParameters()
.addOrReplaceEquivalentAttributes(
new
PipeParameters(alterPipeStatement.getExtractorAttributes()));
extractorAttributes =
-
pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters().getAttribute();
+
pipeMetaFromCoordinator.getStaticMeta().getSourceParameters().getAttribute();
if (onlyContainsUser) {
checkSourceType(alterPipeStatement.getPipeName(),
extractorAttributes);
}
}
} else {
extractorAttributes =
-
pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters().getAttribute();
+
pipeMetaFromCoordinator.getStaticMeta().getSourceParameters().getAttribute();
}
if (!alterPipeStatement.getProcessorAttributes().isEmpty()) {
@@ -2153,18 +2153,18 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
onlyContainsUser(alterPipeStatement.getConnectorAttributes());
pipeMetaFromCoordinator
.getStaticMeta()
- .getConnectorParameters()
+ .getSinkParameters()
.addOrReplaceEquivalentAttributes(
new
PipeParameters(alterPipeStatement.getConnectorAttributes()));
connectorAttributes =
-
pipeMetaFromCoordinator.getStaticMeta().getConnectorParameters().getAttribute();
+
pipeMetaFromCoordinator.getStaticMeta().getSinkParameters().getAttribute();
if (onlyContainsUser) {
checkSinkType(alterPipeStatement.getPipeName(),
connectorAttributes);
}
}
} else {
connectorAttributes =
-
pipeMetaFromCoordinator.getStaticMeta().getConnectorParameters().getAttribute();
+
pipeMetaFromCoordinator.getStaticMeta().getSinkParameters().getAttribute();
}
PipeDataNodeAgent.plugin()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
index 7e19a267dc5..6fb2809c053 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
@@ -102,7 +102,7 @@ public class SubscriptionSinkSubtaskManager {
: new UnboundedBlockingPendingQueue<>(new
PipeDataRegionEventCounter());
final PipeConnector pipeConnector =
-
PipeDataNodeAgent.plugin().dataRegion().reflectConnector(pipeConnectorParameters);
+
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeConnectorParameters);
// 1. Construct, validate and customize PipeConnector, and then
handshake (create connection)
// with the target
try {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
index 84de82206fd..5d4d6a640b6 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
@@ -94,7 +94,7 @@ public class PipeDataNodePluginAgentTest {
IoTDBDataRegionSource.class,
agent
.dataRegion()
- .reflectExtractor(
+ .reflectSource(
new PipeParameters(
new HashMap<String, String>() {
{
@@ -122,7 +122,7 @@ public class PipeDataNodePluginAgentTest {
IoTDBDataRegionAsyncSink.class,
agent
.dataRegion()
- .reflectConnector(
+ .reflectSink(
new PipeParameters(
new HashMap<String, String>() {
{
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
index 6064de61973..be9dfadc4f9 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
@@ -27,7 +27,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
-import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
@@ -265,7 +265,7 @@ public class DeletionResourceTest {
});
final PipeTaskRuntimeConfiguration configuration =
new PipeTaskRuntimeConfiguration(
- new PipeTaskExtractorRuntimeEnvironment(
+ new PipeTaskSourceRuntimeEnvironment(
"1", 1, Integer.parseInt(FAKE_DATA_REGION_IDS[4]), null));
extractor.customize(parameters, configuration);
Assert.assertTrue(extractor.shouldExtractDeletion());
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
index 32af67855be..61d5232a3d8 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.pattern;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
-import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixTreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -103,7 +103,7 @@ public class CachedSchemaPatternMatcherTest {
put(PipeSourceConstant.EXTRACTOR_PATTERN_KEY, "root");
}
}),
- new PipeTaskRuntimeConfiguration(new
PipeTaskExtractorRuntimeEnvironment("1", 1, 1, null)));
+ new PipeTaskRuntimeConfiguration(new
PipeTaskSourceRuntimeEnvironment("1", 1, 1, null)));
extractors.add(dataRegionExtractor);
final int deviceExtractorNum = 10;
@@ -118,8 +118,7 @@ public class CachedSchemaPatternMatcherTest {
put(PipeSourceConstant.EXTRACTOR_PATTERN_KEY, "root.db" +
finalI1);
}
}),
- new PipeTaskRuntimeConfiguration(
- new PipeTaskExtractorRuntimeEnvironment("1", 1, 1, null)));
+ new PipeTaskRuntimeConfiguration(new
PipeTaskSourceRuntimeEnvironment("1", 1, 1, null)));
extractors.add(deviceExtractor);
for (int j = 0; j < seriesExtractorNum; j++) {
final PipeRealtimeDataRegionSource seriesExtractor = new
PipeRealtimeDataRegionFakeSource();
@@ -135,7 +134,7 @@ public class CachedSchemaPatternMatcherTest {
}
}),
new PipeTaskRuntimeConfiguration(
- new PipeTaskExtractorRuntimeEnvironment("1", 1, 1, null)));
+ new PipeTaskSourceRuntimeEnvironment("1", 1, 1, null)));
extractors.add(seriesExtractor);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
index 41e6f04fc08..59aec4b1f7b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
-import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionHybridSource;
@@ -150,28 +150,28 @@ public class PipeRealtimeExtractTest {
final PipeTaskRuntimeConfiguration configuration0 =
new PipeTaskRuntimeConfiguration(
- new PipeTaskExtractorRuntimeEnvironment(
+ new PipeTaskSourceRuntimeEnvironment(
"1",
1,
Integer.parseInt(dataRegion1),
new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
final PipeTaskRuntimeConfiguration configuration1 =
new PipeTaskRuntimeConfiguration(
- new PipeTaskExtractorRuntimeEnvironment(
+ new PipeTaskSourceRuntimeEnvironment(
"1",
1,
Integer.parseInt(dataRegion1),
new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
final PipeTaskRuntimeConfiguration configuration2 =
new PipeTaskRuntimeConfiguration(
- new PipeTaskExtractorRuntimeEnvironment(
+ new PipeTaskSourceRuntimeEnvironment(
"1",
1,
Integer.parseInt(dataRegion2),
new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
final PipeTaskRuntimeConfiguration configuration3 =
new PipeTaskRuntimeConfiguration(
- new PipeTaskExtractorRuntimeEnvironment(
+ new PipeTaskSourceRuntimeEnvironment(
"1",
1,
Integer.parseInt(dataRegion2),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
index ff61d5527d6..33acd4beb63 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
@@ -69,7 +69,7 @@ public abstract class PipePluginAgent {
protected abstract PipeSinkConstructor createPipeConnectorConstructor(
PipePluginMetaKeeper pipePluginMetaKeeper);
- public final PipeExtractor reflectExtractor(PipeParameters
extractorParameters) {
+ public final PipeExtractor reflectSource(PipeParameters extractorParameters)
{
return pipeExtractorConstructor.reflectPlugin(extractorParameters);
}
@@ -77,7 +77,7 @@ public abstract class PipePluginAgent {
return pipeProcessorConstructor.reflectPlugin(processorParameters);
}
- public final PipeConnector reflectConnector(PipeParameters
connectorParameters) {
+ public final PipeConnector reflectSink(PipeParameters connectorParameters) {
return pipeSinkConstructor.reflectPlugin(connectorParameters);
}
@@ -95,7 +95,7 @@ public abstract class PipePluginAgent {
protected PipeExtractor validateExtractor(Map<String, String>
extractorAttributes)
throws Exception {
final PipeParameters extractorParameters = new
PipeParameters(extractorAttributes);
- final PipeExtractor temporaryExtractor =
reflectExtractor(extractorParameters);
+ final PipeExtractor temporaryExtractor =
reflectSource(extractorParameters);
try {
temporaryExtractor.validate(new
PipeParameterValidator(extractorParameters));
} finally {
@@ -127,7 +127,7 @@ public abstract class PipePluginAgent {
protected PipeConnector validateConnector(
String pipeName, Map<String, String> connectorAttributes) throws
Exception {
final PipeParameters connectorParameters = new
PipeParameters(connectorAttributes);
- final PipeConnector temporaryConnector =
reflectConnector(connectorParameters);
+ final PipeConnector temporaryConnector = reflectSink(connectorParameters);
try {
temporaryConnector.validate(new
PipeParameterValidator(connectorParameters));
temporaryConnector.customize(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 10c92eb2695..612704318aa 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -480,9 +480,9 @@ public abstract class PipeTaskAgent {
calculateMemoryUsage(
pipeMetaFromCoordinator.getStaticMeta(),
- pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters(),
+ pipeMetaFromCoordinator.getStaticMeta().getSourceParameters(),
pipeMetaFromCoordinator.getStaticMeta().getProcessorParameters(),
- pipeMetaFromCoordinator.getStaticMeta().getConnectorParameters());
+ pipeMetaFromCoordinator.getStaticMeta().getSinkParameters());
final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
if (existedPipeMeta != null) {
@@ -1012,7 +1012,7 @@ public abstract class PipeTaskAgent {
for (final PipeRuntimeException e :
pipeTaskMeta.getExceptionMessages()) {
if (e instanceof PipeRuntimeSinkCriticalException) {
reusedConnectorParameters2ExceptionMap.putIfAbsent(
- staticMeta.getConnectorParameters(),
+ staticMeta.getSinkParameters(),
(PipeRuntimeSinkCriticalException) e);
}
}
@@ -1032,13 +1032,13 @@ public abstract class PipeTaskAgent {
pipeTaskMeta -> {
if (pipeTaskMeta.getLeaderNodeId() == currentNodeId
&&
reusedConnectorParameters2ExceptionMap.containsKey(
- staticMeta.getConnectorParameters())
+ staticMeta.getSinkParameters())
&& !pipeTaskMeta.containsExceptionMessage(
reusedConnectorParameters2ExceptionMap.get(
- staticMeta.getConnectorParameters()))) {
+ staticMeta.getSinkParameters()))) {
final PipeRuntimeSinkCriticalException exception =
reusedConnectorParameters2ExceptionMap.get(
- staticMeta.getConnectorParameters());
+ staticMeta.getSinkParameters());
pipeTaskMeta.trackExceptionMessage(exception);
LOGGER.warn(
"Pipe {} (creation time = {}) will be stopped
because of critical exception "
@@ -1046,7 +1046,7 @@ public abstract class PipeTaskAgent {
staticMeta.getPipeName(),
staticMeta.getCreationTime(),
exception.getTimeStamp(),
- staticMeta.getConnectorParameters());
+ staticMeta.getSinkParameters());
}
});
});
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java
index 3fa94cf9870..4b57cd64e43 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java
@@ -43,9 +43,9 @@ public class PipeStaticMeta {
private String pipeName;
private long creationTime;
- private PipeParameters extractorParameters;
+ private PipeParameters sourceParameters;
private PipeParameters processorParameters;
- private PipeParameters connectorParameters;
+ private PipeParameters sinkParameters;
private PipeStaticMeta() {
// Empty constructor
@@ -59,9 +59,9 @@ public class PipeStaticMeta {
final Map<String, String> connectorAttributes) {
this.pipeName = pipeName;
this.creationTime = creationTime;
- extractorParameters = new PipeParameters(extractorAttributes);
+ sourceParameters = new PipeParameters(extractorAttributes);
processorParameters = new PipeParameters(processorAttributes);
- connectorParameters = new PipeParameters(connectorAttributes);
+ sinkParameters = new PipeParameters(connectorAttributes);
}
public String getPipeName() {
@@ -72,16 +72,16 @@ public class PipeStaticMeta {
return creationTime;
}
- public PipeParameters getExtractorParameters() {
- return extractorParameters;
+ public PipeParameters getSourceParameters() {
+ return sourceParameters;
}
public PipeParameters getProcessorParameters() {
return processorParameters;
}
- public PipeParameters getConnectorParameters() {
- return connectorParameters;
+ public PipeParameters getSinkParameters() {
+ return sinkParameters;
}
public PipeType getPipeType() {
@@ -90,7 +90,7 @@ public class PipeStaticMeta {
public boolean isSourceExternal() {
return !BuiltinPipePlugin.BUILTIN_SOURCES.contains(
- extractorParameters
+ sourceParameters
.getStringOrDefault(
Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY,
PipeSourceConstant.SOURCE_KEY),
BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
@@ -108,8 +108,8 @@ public class PipeStaticMeta {
ReadWriteIOUtils.write(pipeName, outputStream);
ReadWriteIOUtils.write(creationTime, outputStream);
- ReadWriteIOUtils.write(extractorParameters.getAttribute().size(),
outputStream);
- for (final Map.Entry<String, String> entry :
extractorParameters.getAttribute().entrySet()) {
+ ReadWriteIOUtils.write(sourceParameters.getAttribute().size(),
outputStream);
+ for (final Map.Entry<String, String> entry :
sourceParameters.getAttribute().entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue(), outputStream);
}
@@ -118,8 +118,8 @@ public class PipeStaticMeta {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue(), outputStream);
}
- ReadWriteIOUtils.write(connectorParameters.getAttribute().size(),
outputStream);
- for (final Map.Entry<String, String> entry :
connectorParameters.getAttribute().entrySet()) {
+ ReadWriteIOUtils.write(sinkParameters.getAttribute().size(), outputStream);
+ for (final Map.Entry<String, String> entry :
sinkParameters.getAttribute().entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue(), outputStream);
}
@@ -131,15 +131,15 @@ public class PipeStaticMeta {
pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(inputStream);
pipeStaticMeta.creationTime = ReadWriteIOUtils.readLong(inputStream);
- pipeStaticMeta.extractorParameters = new PipeParameters(new HashMap<>());
+ pipeStaticMeta.sourceParameters = new PipeParameters(new HashMap<>());
pipeStaticMeta.processorParameters = new PipeParameters(new HashMap<>());
- pipeStaticMeta.connectorParameters = new PipeParameters(new HashMap<>());
+ pipeStaticMeta.sinkParameters = new PipeParameters(new HashMap<>());
int size = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < size; ++i) {
final String key = ReadWriteIOUtils.readString(inputStream);
final String value = ReadWriteIOUtils.readString(inputStream);
- pipeStaticMeta.extractorParameters.getAttribute().put(key, value);
+ pipeStaticMeta.sourceParameters.getAttribute().put(key, value);
}
size = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < size; ++i) {
@@ -151,7 +151,7 @@ public class PipeStaticMeta {
for (int i = 0; i < size; ++i) {
final String key = ReadWriteIOUtils.readString(inputStream);
final String value = ReadWriteIOUtils.readString(inputStream);
- pipeStaticMeta.connectorParameters.getAttribute().put(key, value);
+ pipeStaticMeta.sinkParameters.getAttribute().put(key, value);
}
return pipeStaticMeta;
@@ -163,15 +163,15 @@ public class PipeStaticMeta {
pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(byteBuffer);
pipeStaticMeta.creationTime = ReadWriteIOUtils.readLong(byteBuffer);
- pipeStaticMeta.extractorParameters = new PipeParameters(new HashMap<>());
+ pipeStaticMeta.sourceParameters = new PipeParameters(new HashMap<>());
pipeStaticMeta.processorParameters = new PipeParameters(new HashMap<>());
- pipeStaticMeta.connectorParameters = new PipeParameters(new HashMap<>());
+ pipeStaticMeta.sinkParameters = new PipeParameters(new HashMap<>());
int size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
final String key = ReadWriteIOUtils.readString(byteBuffer);
final String value = ReadWriteIOUtils.readString(byteBuffer);
- pipeStaticMeta.extractorParameters.getAttribute().put(key, value);
+ pipeStaticMeta.sourceParameters.getAttribute().put(key, value);
}
size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
@@ -183,7 +183,7 @@ public class PipeStaticMeta {
for (int i = 0; i < size; ++i) {
final String key = ReadWriteIOUtils.readString(byteBuffer);
final String value = ReadWriteIOUtils.readString(byteBuffer);
- pipeStaticMeta.connectorParameters.getAttribute().put(key, value);
+ pipeStaticMeta.sinkParameters.getAttribute().put(key, value);
}
return pipeStaticMeta;
@@ -200,15 +200,15 @@ public class PipeStaticMeta {
final PipeStaticMeta that = (PipeStaticMeta) obj;
return pipeName.equals(that.pipeName)
&& creationTime == that.creationTime
- && extractorParameters.equals(that.extractorParameters)
+ && sourceParameters.equals(that.sourceParameters)
&& processorParameters.equals(that.processorParameters)
- && connectorParameters.equals(that.connectorParameters);
+ && sinkParameters.equals(that.sinkParameters);
}
@Override
public int hashCode() {
return Objects.hash(
- pipeName, creationTime, extractorParameters, processorParameters,
connectorParameters);
+ pipeName, creationTime, sourceParameters, processorParameters,
sinkParameters);
}
@Override
@@ -218,12 +218,12 @@ public class PipeStaticMeta {
+ pipeName
+ "', creationTime="
+ creationTime
- + ", extractorParameters="
- + extractorParameters
+ + ", sourceParameters="
+ + sourceParameters
+ ", processorParameters="
+ processorParameters
- + ", connectorParameters="
- + connectorParameters
+ + ", sinkParameters="
+ + sinkParameters
+ "}";
}
@@ -246,7 +246,7 @@ public class PipeStaticMeta {
public boolean visibleUnder(final boolean isTableModel) {
final Visibility visibility =
- VisibilityUtils.calculateFromExtractorParameters(extractorParameters);
+ VisibilityUtils.calculateFromExtractorParameters(sourceParameters);
return VisibilityUtils.isCompatible(visibility, isTableModel);
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 62cce7438ba..66d43e0743c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -40,7 +40,7 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeAbstractSinkSubtask.class);
// For output (transfer events to the target system in connector)
- protected PipeConnector outputPipeConnector;
+ protected PipeConnector outputPipeSink;
// For thread pool to execute callbacks
protected ExecutorService subtaskCallbackListeningExecutor;
@@ -54,9 +54,9 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
protected volatile Event lastExceptionEvent;
protected PipeAbstractSinkSubtask(
- final String taskID, final long creationTime, final PipeConnector
outputPipeConnector) {
+ final String taskID, final long creationTime, final PipeConnector
outputPipeSink) {
super(taskID, creationTime);
- this.outputPipeConnector = outputPipeConnector;
+ this.outputPipeSink = outputPipeSink;
}
@Override
@@ -149,23 +149,23 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
private boolean onPipeConnectionException(final Throwable throwable) {
LOGGER.warn(
"PipeConnectionException occurred, {} retries to handshake with the
target system.",
- outputPipeConnector.getClass().getName(),
+ outputPipeSink.getClass().getName(),
throwable);
int retry = 0;
while (retry < MAX_RETRY_TIMES) {
try {
- outputPipeConnector.handshake();
+ outputPipeSink.handshake();
LOGGER.info(
"{} handshakes with the target system successfully.",
- outputPipeConnector.getClass().getName());
+ outputPipeSink.getClass().getName());
break;
} catch (final Exception e) {
retry++;
LOGGER.warn(
"{} failed to handshake with the target system for {} times, "
+ "will retry at most {} times.",
- outputPipeConnector.getClass().getName(),
+ outputPipeSink.getClass().getName(),
retry,
MAX_RETRY_TIMES,
e);
@@ -193,7 +193,7 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
+ "stopping current subtask {} (creation time: {}, simple class:
{}). "
+ "Status shown when query the pipe will be 'STOPPED'. "
+ "Please restart the task by executing 'START PIPE' manually if
needed.",
- outputPipeConnector.getClass().getName(),
+ outputPipeSink.getClass().getName(),
MAX_RETRY_TIMES,
taskID,
creationTime,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskExtractorRuntimeEnvironment.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSourceRuntimeEnvironment.java
similarity index 81%
rename from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskExtractorRuntimeEnvironment.java
rename to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSourceRuntimeEnvironment.java
index c546ab47278..cfecf9c231a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskExtractorRuntimeEnvironment.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSourceRuntimeEnvironment.java
@@ -21,12 +21,15 @@ package org.apache.iotdb.commons.pipe.config.plugin.env;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
-public class PipeTaskExtractorRuntimeEnvironment extends
PipeTaskRuntimeEnvironment {
+public class PipeTaskSourceRuntimeEnvironment extends
PipeTaskRuntimeEnvironment {
private final PipeTaskMeta pipeTaskMeta;
- public PipeTaskExtractorRuntimeEnvironment(
- String pipeName, long creationTime, int regionId, PipeTaskMeta
pipeTaskMeta) {
+ public PipeTaskSourceRuntimeEnvironment(
+ final String pipeName,
+ final long creationTime,
+ final int regionId,
+ final PipeTaskMeta pipeTaskMeta) {
super(pipeName, creationTime, regionId);
this.pipeTaskMeta = pipeTaskMeta;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
index 65bb977a624..c16f2310629 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.commons.pipe.source;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
-import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.pipe.api.PipeExtractor;
@@ -149,8 +149,8 @@ public abstract class IoTDBSource implements PipeExtractor {
public void customize(
final PipeParameters parameters, final PipeExtractorRuntimeConfiguration
configuration)
throws Exception {
- final PipeTaskExtractorRuntimeEnvironment environment =
- ((PipeTaskExtractorRuntimeEnvironment)
configuration.getRuntimeEnvironment());
+ final PipeTaskSourceRuntimeEnvironment environment =
+ ((PipeTaskSourceRuntimeEnvironment)
configuration.getRuntimeEnvironment());
regionId = environment.getRegionId();
pipeName = environment.getPipeName();
creationTime = environment.getCreationTime();