This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 81ec29cf409 [To dev/1.3] Pipe: Fixed the semantics for new regions &
realtime / history only pipes (#16641)
81ec29cf409 is described below
commit 81ec29cf409099315663621649f323d55ee184a7
Author: Caideyipi <[email protected]>
AuthorDate: Thu Oct 23 20:41:23 2025 +0800
[To dev/1.3] Pipe: Fixed the semantics for new regions & realtime / history
only pipes (#16641)
---
.../manager/partition/PartitionManager.java | 8 +--
.../confignode/persistence/pipe/PipeTaskInfo.java | 14 ++++-
.../impl/pipe/AbstractOperatePipeProcedureV2.java | 19 ++++++
.../impl/pipe/task/AlterPipeProcedureV2.java | 39 ++++++++++--
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 12 +---
.../task/builder/PipeDataNodeTaskBuilder.java | 69 ++++++++++------------
.../task/subtask/sink/PipeSinkSubtaskManager.java | 2 +-
.../source/dataregion/IoTDBDataRegionSource.java | 44 ++++++--------
.../PipeHistoricalDataRegionTsFileSource.java | 15 +----
.../subtask/SubscriptionSinkSubtaskManager.java | 2 +-
.../commons/pipe/agent/task/PipeTaskAgent.java | 38 ++++++++++++
.../commons/pipe/agent/task/meta/PipeTaskMeta.java | 39 +++++++++---
.../pipe/config/constant/SystemConstant.java | 3 +-
13 files changed, 194 insertions(+), 110 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 632a5b59419..e186be44354 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -955,14 +955,10 @@ public class PartitionManager {
}
public Optional<TConsensusGroupId> generateTConsensusGroupIdByRegionId(final
int regionId) {
- if (configManager
- .getPartitionManager()
- .isRegionGroupExists(new
TConsensusGroupId(TConsensusGroupType.SchemaRegion, regionId))) {
+ if (isRegionGroupExists(new
TConsensusGroupId(TConsensusGroupType.SchemaRegion, regionId))) {
return Optional.of(new
TConsensusGroupId(TConsensusGroupType.SchemaRegion, regionId));
}
- if (configManager
- .getPartitionManager()
- .isRegionGroupExists(new
TConsensusGroupId(TConsensusGroupType.DataRegion, regionId))) {
+ if (isRegionGroupExists(new
TConsensusGroupId(TConsensusGroupType.DataRegion, regionId))) {
return Optional.of(new TConsensusGroupId(TConsensusGroupType.DataRegion,
regionId));
}
String msg =
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 3df50fbf3b7..08f96a25a61 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -19,11 +19,13 @@
package org.apache.iotdb.confignode.persistence.pipe;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMetaKeeper;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
@@ -610,13 +612,21 @@ public class PipeTaskInfo implements SnapshotProcessor {
} else {
consensusGroupIdToTaskMetaMap.remove(consensusGroupId.getId());
}
- } else {
+ } else if (!PipeTaskAgent.isHistoryOnlyPipe(
+
pipeMeta.getStaticMeta().getExtractorParameters())
+ || !consensusGroupId
+ .getType()
+ .equals(TConsensusGroupType.DataRegion)) {
// If CN does not contain the region group, it
means the data
// region group is newly added.
+ // We do not handle history only pipes for new
data regions
+
+ // Newly added leader
if (newLeader != -1) {
consensusGroupIdToTaskMetaMap.put(
consensusGroupId.getId(),
- new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, newLeader));
+ new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, newLeader)
+ .markAsNewlyAdded());
}
// else:
// "The pipe task meta does not contain the data
region group {} or
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index b860d7fcb82..857d5733f6a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.procedure.impl.pipe;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import
org.apache.iotdb.confignode.manager.pipe.metric.overview.PipeProcedureMetrics;
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -29,6 +30,7 @@ import
org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeMetaSyncProce
import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
import
org.apache.iotdb.confignode.procedure.state.pipe.task.OperatePipeTaskState;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -41,6 +43,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -526,6 +529,22 @@ public abstract class AbstractOperatePipeProcedureV2
pipeTaskInfo.get().getPipeMetaByPipeName(pipeName).serialize());
}
+ protected Map<Integer, TPushPipeMetaResp>
pushSinglePipeMetaToDataNodes4Realtime(
+ String pipeName, ConfigNodeProcedureEnv env) throws IOException {
+ final PipeMeta pipeMeta =
pipeTaskInfo.get().getPipeMetaByPipeName(pipeName);
+ // Note that although the altered pipe has progress in it,
+ // if we alter it to realtime we should ignore the previous data
+ pipeMeta
+ .getStaticMeta()
+ .getExtractorParameters()
+ .addOrReplaceEquivalentAttributes(
+ new PipeParameters(
+ Collections.singletonMap(
+ SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
Boolean.FALSE.toString())));
+ return env.pushSinglePipeMetaToDataNodes(
+ pipeTaskInfo.get().getPipeMetaByPipeName(pipeName).serialize());
+ }
+
/**
* Drop a pipe on all the dataNodes.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index b11c74408a6..5a7b9b8a5b2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.confignode.procedure.impl.pipe.task;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
@@ -137,6 +139,7 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
final ConcurrentMap<Integer, PipeTaskMeta>
updatedConsensusGroupIdToTaskMetaMap =
new ConcurrentHashMap<>();
+
// data regions & schema regions
env.getConfigManager()
.getLoadManager()
@@ -151,11 +154,33 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
&& !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)
&& !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE +
".")
&& currentPipeTaskMeta != null
- && currentPipeTaskMeta.getLeaderNodeId() ==
regionLeaderNodeId) {
+ && currentPipeTaskMeta.getLeaderNodeId() ==
regionLeaderNodeId
+ && !(PipeTaskAgent.isHistoryOnlyPipe(
+ currentPipeStaticMeta.getExtractorParameters())
+ && PipeTaskAgent.isHistoryOnlyPipe(
+ updatedPipeStaticMeta.getExtractorParameters())
+ && regionGroupId.getType() ==
TConsensusGroupType.DataRegion
+ && currentPipeTaskMeta.isNewlyAdded())) {
// Pipe only collect user's data, filter metric database here.
+ // If it is altered to "pure historical", then the regionIds
are always new here,
+ // then it will extract all existing data now, not existing
data since the
+ // original pipe was created
+ // Similar for "pure realtime"
updatedConsensusGroupIdToTaskMetaMap.put(
regionGroupId.getId(),
- new PipeTaskMeta(currentPipeTaskMeta.getProgressIndex(),
regionLeaderNodeId));
+ new PipeTaskMeta(
+ currentPipeTaskMeta.getProgressIndex(),
+
PipeTaskMeta.isNewlyAdded(currentPipeTaskMeta.getLeaderNodeId())
+ && !(!PipeTaskAgent.isHistoryOnlyPipe(
+
currentPipeStaticMeta.getExtractorParameters())
+ && PipeTaskAgent.isHistoryOnlyPipe(
+
updatedPipeStaticMeta.getExtractorParameters()))
+ && !(!PipeTaskAgent.isRealtimeOnlyPipe(
+
currentPipeStaticMeta.getExtractorParameters())
+ && PipeTaskAgent.isRealtimeOnlyPipe(
+
updatedPipeStaticMeta.getExtractorParameters()))
+ ?
PipeTaskMeta.getRevertedLeader(regionLeaderNodeId)
+ : regionLeaderNodeId));
}
});
@@ -209,8 +234,14 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
final String pipeName = alterPipeRequest.getPipeName();
LOGGER.info("AlterPipeProcedureV2: executeFromOperateOnDataNodes({})",
pipeName);
- String exceptionMessage =
- parsePushPipeMetaExceptionForPipe(pipeName,
pushSinglePipeMetaToDataNodes(pipeName, env));
+ final String exceptionMessage =
+ parsePushPipeMetaExceptionForPipe(
+ pipeName,
+
!PipeTaskAgent.isRealtimeOnlyPipe(currentPipeStaticMeta.getExtractorParameters())
+ && PipeTaskAgent.isRealtimeOnlyPipe(
+ updatedPipeStaticMeta.getExtractorParameters())
+ ? pushSinglePipeMetaToDataNodes4Realtime(pipeName, env)
+ : pushSinglePipeMetaToDataNodes(pipeName, env));
if (!exceptionMessage.isEmpty()) {
LOGGER.warn(
"Failed to alter pipe {}, details: {}, metadata will be synchronized
later.",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index f916ac067c8..7070d57d3b3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -603,14 +603,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
return isHistoryEnable && isRealtimeEnable;
}
- private boolean isSnapshotMode(final PipeParameters parameters) {
- final String sourceModeValue =
- parameters.getStringOrDefault(
- Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY),
EXTRACTOR_MODE_DEFAULT_VALUE);
- return sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
- || sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
- }
-
@Override
public void runPipeTasks(
final Collection<PipeTask> pipeTasks, final Consumer<PipeTask>
runSingle) {
@@ -789,7 +781,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
// If the source is not history, we do not need to allocate memory
boolean isExtractorHistory =
sourceParameters.getBooleanOrDefault(
- SystemConstant.RESTART_KEY,
SystemConstant.RESTART_DEFAULT_VALUE)
+ SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
SystemConstant.RESTART_DEFAULT_VALUE)
|| sourceParameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
@@ -873,7 +865,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
// If the source is history enable, we need to transfer tsfile
boolean needTransferTsFile =
sourceParameters.getBooleanOrDefault(
- SystemConstant.RESTART_KEY,
SystemConstant.RESTART_DEFAULT_VALUE)
+ SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
SystemConstant.RESTART_DEFAULT_VALUE)
|| sourceParameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index 21f344f173d..2edc5b943f6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.agent.task.builder;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType;
@@ -48,13 +49,8 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_TABLET_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_FORMAT_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_ENABLE_KEY;
public class PipeDataNodeTaskBuilder {
@@ -79,56 +75,56 @@ public class PipeDataNodeTaskBuilder {
}
public PipeDataNodeTask build() {
- // Event flow: extractor -> processor -> connector
+ // Event flow: source -> processor -> sink
// Analyzes the PipeParameters to identify potential conflicts.
- final PipeParameters extractorParameters =
+ final PipeParameters sourceParameters =
blendUserAndSystemParameters(pipeStaticMeta.getExtractorParameters());
- final PipeParameters connectorParameters =
+ final PipeParameters sinkParameters =
blendUserAndSystemParameters(pipeStaticMeta.getConnectorParameters());
- checkConflict(extractorParameters, connectorParameters);
+ checkConflict(sourceParameters, sinkParameters);
- // We first build the extractor and connector, then build the processor.
- final PipeTaskSourceStage extractorStage =
+ // We first build the source and sink, then build the processor.
+ final PipeTaskSourceStage sourceStage =
new PipeTaskSourceStage(
pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime(),
- extractorParameters,
+ sourceParameters,
regionId,
pipeTaskMeta);
- final PipeTaskSinkStage connectorStage;
+ final PipeTaskSinkStage sinkStage;
final PipeType pipeType = pipeStaticMeta.getPipeType();
if (PipeType.SUBSCRIPTION.equals(pipeType)) {
- connectorStage =
+ sinkStage =
new SubscriptionTaskSinkStage(
pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime(),
- connectorParameters,
+ sinkParameters,
regionId,
PipeSubtaskExecutorManager.getInstance().getSubscriptionExecutor());
} else { // user pipe or consensus pipe
- connectorStage =
+ sinkStage =
new PipeTaskSinkStage(
pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime(),
- connectorParameters,
+ sinkParameters,
regionId,
pipeType.equals(PipeType.USER)
?
PipeSubtaskExecutorManager.getInstance().getConnectorExecutorSupplier()
:
PipeSubtaskExecutorManager.getInstance().getConsensusExecutorSupplier());
}
- // The processor connects the extractor and connector.
+ // The processor connects the source and sink.
final PipeTaskProcessorStage processorStage =
new PipeTaskProcessorStage(
pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime(),
blendUserAndSystemParameters(pipeStaticMeta.getProcessorParameters()),
regionId,
- extractorStage.getEventSupplier(),
- connectorStage.getPipeConnectorPendingQueue(),
+ sourceStage.getEventSupplier(),
+ sinkStage.getPipeConnectorPendingQueue(),
PROCESSOR_EXECUTOR,
pipeTaskMeta,
pipeStaticMeta
@@ -140,12 +136,13 @@ public class PipeDataNodeTaskBuilder {
PipeType.SUBSCRIPTION.equals(pipeType));
return new PipeDataNodeTask(
- pipeStaticMeta.getPipeName(), regionId, extractorStage,
processorStage, connectorStage);
+ pipeStaticMeta.getPipeName(), regionId, sourceStage, processorStage,
sinkStage);
}
private void generateSystemParameters() {
- if (!(pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex)) {
- systemParameters.put(SystemConstant.RESTART_KEY,
Boolean.TRUE.toString());
+ if (!(pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex)
+ || pipeTaskMeta.isNewlyAdded()) {
+ systemParameters.put(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
Boolean.TRUE.toString());
}
}
@@ -158,21 +155,15 @@ public class PipeDataNodeTaskBuilder {
}
private void checkConflict(
- final PipeParameters extractorParameters, final PipeParameters
connectorParameters) {
+ final PipeParameters sourceParameters, final PipeParameters
sinkParameters) {
final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair;
final boolean shouldTerminatePipeOnAllHistoricalEventsConsumed;
try {
insertionDeletionListeningOptionPair =
-
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(extractorParameters);
-
- final String extractorModeValue =
- extractorParameters.getStringOrDefault(
- Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY),
EXTRACTOR_MODE_DEFAULT_VALUE);
+
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(sourceParameters);
shouldTerminatePipeOnAllHistoricalEventsConsumed =
- extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
- ||
extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
-
+ PipeTaskAgent.isSnapshotMode(sourceParameters);
} catch (final IllegalPathException e) {
LOGGER.warn(
"PipeDataNodeTaskBuilder failed to parse 'inclusion' and 'exclusion'
parameters: {}",
@@ -184,17 +175,17 @@ public class PipeDataNodeTaskBuilder {
if (insertionDeletionListeningOptionPair.right
|| shouldTerminatePipeOnAllHistoricalEventsConsumed) {
final Boolean isRealtime =
- connectorParameters.getBooleanByKeys(
+ sinkParameters.getBooleanByKeys(
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
PipeSinkConstant.SINK_REALTIME_FIRST_KEY);
if (isRealtime == null) {
-
connectorParameters.addAttribute(PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
"false");
+
sinkParameters.addAttribute(PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
"false");
if (insertionDeletionListeningOptionPair.right) {
LOGGER.info(
"PipeDataNodeTaskBuilder: When 'inclusion' contains
'data.delete', 'realtime-first' is defaulted to 'false' to prevent sync issues
after deletion.");
} else {
LOGGER.info(
- "PipeDataNodeTaskBuilder: When extractor uses snapshot model,
'realtime-first' is defaulted to 'false' to prevent premature halt before
transfer completion.");
+ "PipeDataNodeTaskBuilder: When source uses snapshot model,
'realtime-first' is defaulted to 'false' to prevent premature halt before
transfer completion.");
}
} else if (isRealtime) {
if (insertionDeletionListeningOptionPair.right) {
@@ -202,24 +193,24 @@ public class PipeDataNodeTaskBuilder {
"PipeDataNodeTaskBuilder: When 'inclusion' includes
'data.delete', 'realtime-first' set to 'true' may result in data
synchronization issues after deletion.");
} else {
LOGGER.warn(
- "PipeDataNodeTaskBuilder: When extractor uses snapshot model,
'realtime-first' set to 'true' may cause prevent premature halt before transfer
completion.");
+ "PipeDataNodeTaskBuilder: When source uses snapshot model,
'realtime-first' set to 'true' may cause prevent premature halt before transfer
completion.");
}
}
}
final boolean isRealtimeEnabled =
- extractorParameters.getBooleanOrDefault(
+ sourceParameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE);
if (isRealtimeEnabled &&
!shouldTerminatePipeOnAllHistoricalEventsConsumed) {
final Boolean enableSendTsFileLimit =
- connectorParameters.getBooleanByKeys(
+ sinkParameters.getBooleanByKeys(
PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT);
if (enableSendTsFileLimit == null) {
-
connectorParameters.addAttribute(PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
"true");
+
sinkParameters.addAttribute(PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
"true");
LOGGER.info(
"PipeDataNodeTaskBuilder: When the realtime sync is enabled, we
enable rate limiter in sending tsfile by default to reserve disk and network IO
for realtime sending.");
} else if (!enableSendTsFileLimit) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index e2743147705..97859938828 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -257,7 +257,7 @@ public class PipeSinkSubtaskManager {
private String generateAttributeSortedString(final PipeParameters
pipeConnectorParameters) {
final TreeMap<String, String> sortedStringSourceMap =
new TreeMap<>(pipeConnectorParameters.getAttribute());
- sortedStringSourceMap.remove(SystemConstant.RESTART_KEY);
+ sortedStringSourceMap.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY);
return sortedStringSourceMap.toString();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
index e7ff984358a..ec99ed2abaf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.source.dataregion;
import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import
org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBPipePattern;
import org.apache.iotdb.commons.pipe.source.IoTDBSource;
@@ -58,10 +59,6 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.E
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE;
@@ -81,7 +78,6 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.S
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_FORMAT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_MODE_KEY;
@@ -123,7 +119,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
"The pipe cannot transfer data when data region is using ratis
consensus.");
}
- // Validate extractor.pattern.format is within valid range
+ // Validate source.pattern.format is within valid range
validator
.validateAttributeValueRange(
EXTRACTOR_PATTERN_FORMAT_KEY,
@@ -143,7 +139,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
// Check whether the pattern is legal
validatePattern(pattern);
- // Validate extractor.history.enable and extractor.realtime.enable
+ // Validate source.history.enable and source.realtime.enable
validator
.validateAttributeValueRange(
EXTRACTOR_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(),
Boolean.FALSE.toString())
@@ -167,7 +163,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE));
- // Validate extractor.realtime.mode
+ // Validate source.realtime.mode
if (validator
.getParameters()
.getBooleanOrDefault(
@@ -245,28 +241,22 @@ public class IoTDBDataRegionSource extends IoTDBSource {
}
private void constructRealtimeExtractor(final PipeParameters parameters) {
- // Use heartbeat only extractor if disable realtime extractor
+ // Use heartbeat only source if disable realtime source
if (!parameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
realtimeExtractor = new PipeRealtimeDataRegionHeartbeatSource();
LOGGER.info(
- "Pipe: '{}' is set to false, use heartbeat realtime extractor.",
- EXTRACTOR_REALTIME_ENABLE_KEY);
+ "Pipe: '{}' ('{}') is set to false, use heartbeat realtime source.",
+ EXTRACTOR_REALTIME_ENABLE_KEY,
+ SOURCE_REALTIME_ENABLE_KEY);
return;
}
- // Use heartbeat only extractor if enable snapshot mode
- final String extractorModeValue =
- parameters.getStringOrDefault(
- Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY),
EXTRACTOR_MODE_DEFAULT_VALUE);
- if (extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE)
- || extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE))
{
+ // Use heartbeat only source if enable snapshot mode
+ if (PipeTaskAgent.isSnapshotMode(parameters)) {
realtimeExtractor = new PipeRealtimeDataRegionHeartbeatSource();
- LOGGER.info(
- "Pipe: '{}' is set to {}, use heartbeat realtime extractor.",
- EXTRACTOR_MODE_KEY,
- EXTRACTOR_MODE_SNAPSHOT_VALUE);
+ LOGGER.info("Pipe: snapshot mode is enabled, use heartbeat realtime
source.");
return;
}
@@ -295,7 +285,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
realtimeExtractor = new PipeRealtimeDataRegionHybridSource();
if (LOGGER.isWarnEnabled()) {
LOGGER.warn(
- "Pipe: Unsupported extractor realtime mode: {}, create a hybrid
extractor.",
+ "Pipe: Unsupported source realtime mode: {}, create a hybrid
source.",
parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY));
}
}
@@ -345,7 +335,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
final long startTime = System.currentTimeMillis();
LOGGER.info(
- "Pipe {}@{}: Starting historical extractor {} and realtime extractor
{}.",
+ "Pipe {}@{}: Starting historical source {} and realtime source {}.",
pipeName,
regionId,
historicalExtractor.getClass().getSimpleName(),
@@ -356,7 +346,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
final AtomicReference<Exception> exceptionHolder = new
AtomicReference<>(null);
final DataRegionId dataRegionIdObject = new DataRegionId(this.regionId);
while (true) {
- // try to start extractors in the data region ...
+ // try to start sources in the data region ...
// first try to run if data region exists, then try to run if data
region does not exist.
// both conditions fail is not common, which means the data region is
created during the
// runIfPresent and runIfAbsent operations. in this case, we need to
retry.
@@ -379,7 +369,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
rethrowExceptionIfAny(exceptionHolder);
LOGGER.info(
- "Pipe {}@{}: Started historical extractor {} and realtime
extractor {} successfully within {} ms.",
+ "Pipe {}@{}: Started historical source {} and realtime source {}
successfully within {} ms.",
pipeName,
regionId,
historicalExtractor.getClass().getSimpleName(),
@@ -404,7 +394,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
} catch (final Exception e) {
exceptionHolder.set(e);
LOGGER.warn(
- "Pipe {}@{}: Start historical extractor {} and realtime extractor {}
error.",
+ "Pipe {}@{}: Start historical source {} and realtime source {}
error.",
pipeName,
regionId,
historicalExtractor.getClass().getSimpleName(),
@@ -415,7 +405,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
private void rethrowExceptionIfAny(final AtomicReference<Exception>
exceptionHolder) {
if (exceptionHolder.get() != null) {
- throw new PipeException("failed to start extractors.",
exceptionHolder.get());
+ throw new PipeException("failed to start sources.",
exceptionHolder.get());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
index 0ed563d52b5..91f5be62f11 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
import
org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
@@ -78,10 +79,6 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.E
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODS_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_START_TIME_KEY;
@@ -91,7 +88,6 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.S
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODS_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_START_TIME_KEY;
@@ -216,7 +212,7 @@ public class PipeHistoricalDataRegionTsFileSource
implements PipeHistoricalDataR
// enabling the historical data extraction, which may affect the realtime
data extraction.
isHistoricalSourceEnabled =
parameters.getBooleanOrDefault(
- SystemConstant.RESTART_KEY,
SystemConstant.RESTART_DEFAULT_VALUE)
+ SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
SystemConstant.RESTART_DEFAULT_VALUE)
|| parameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
@@ -289,12 +285,7 @@ public class PipeHistoricalDataRegionTsFileSource
implements PipeHistoricalDataR
|| // Should extract deletion
listeningOptionPair.getRight());
- final String extractorModeValue =
- parameters.getStringOrDefault(
- Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY),
EXTRACTOR_MODE_DEFAULT_VALUE);
- shouldTerminatePipeOnAllHistoricalEventsConsumed =
- extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
- || extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
+ shouldTerminatePipeOnAllHistoricalEventsConsumed =
PipeTaskAgent.isSnapshotMode(parameters);
isForwardingPipeRequests =
parameters.getBooleanOrDefault(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
index 7e19a267dc5..959870898a2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
@@ -210,7 +210,7 @@ public class SubscriptionSinkSubtaskManager {
private String generateAttributeSortedString(final PipeParameters
pipeConnectorParameters) {
final TreeMap<String, String> sortedStringSourceMap =
new TreeMap<>(pipeConnectorParameters.getAttribute());
- sortedStringSourceMap.remove(SystemConstant.RESTART_KEY);
+ sortedStringSourceMap.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY);
return sortedStringSourceMap.toString();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index b695b8f6ad3..726bc1b6a1f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -47,6 +47,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -63,6 +64,18 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_ENABLE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_ENABLE_KEY;
+
/**
* State transition diagram of a pipe task:
*
@@ -1095,6 +1108,31 @@ public abstract class PipeTaskAgent {
public abstract void runPipeTasks(
final Collection<PipeTask> pipeTasks, final Consumer<PipeTask>
runSingle);
+ public static boolean isHistoryOnlyPipe(final PipeParameters parameters) {
+ return isSnapshotMode(parameters)
+ || !parameters.getBooleanOrDefault(
+ Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
+ EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE);
+ }
+
+ public static boolean isRealtimeOnlyPipe(final PipeParameters parameters) {
+ return !isSnapshotMode(parameters)
+ && !parameters.getBooleanOrDefault(
+ Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
+ EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
+ }
+
+ public static boolean isSnapshotMode(final PipeParameters parameters) {
+ final boolean isSnapshotMode;
+ final String sourceModeValue =
+ parameters.getStringOrDefault(
+ Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY),
EXTRACTOR_MODE_DEFAULT_VALUE);
+ isSnapshotMode =
+ sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
+ || sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
+ return isSnapshotMode;
+ }
+
///////////////////////// Maintain meta info /////////////////////////
public long getPipeCreationTime(final String pipeName) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
index bf9a1869246..9584ca8cbab 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
@@ -44,6 +44,8 @@ import java.util.concurrent.atomic.AtomicReference;
public class PipeTaskMeta {
private final AtomicReference<ProgressIndex> progressIndex = new
AtomicReference<>();
+
+ // This is -2 - leaderNodeId iff it's a newly added region with internal
source.
private final AtomicInteger leaderNodeId = new AtomicInteger(0);
/**
@@ -63,21 +65,44 @@ public class PipeTaskMeta {
this.leaderNodeId.set(leaderNodeId);
}
- public ProgressIndex getProgressIndex() {
- return progressIndex.get();
+ ///////////////////////// Region old & new test /////////////////////////
+
+ public PipeTaskMeta markAsNewlyAdded() {
+ leaderNodeId.getAndUpdate(PipeTaskMeta::getRevertedLeader);
+ return this;
}
- public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) {
- return progressIndex.updateAndGet(
- index ->
index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex));
+ public boolean isNewlyAdded() {
+ return isNewlyAdded(leaderNodeId.get());
}
public int getLeaderNodeId() {
- return leaderNodeId.get();
+ final int result = leaderNodeId.get();
+ return isNewlyAdded(result) ? getRevertedLeader(result) : result;
}
public void setLeaderNodeId(final int leaderNodeId) {
- this.leaderNodeId.set(leaderNodeId);
+ this.leaderNodeId.updateAndGet(
+ leaderId -> isNewlyAdded(leaderId) ? getRevertedLeader(leaderNodeId) :
leaderNodeId);
+ }
+
+ public static int getRevertedLeader(final int leaderNodeId) {
+ return -2 - leaderNodeId;
+ }
+
+ public static boolean isNewlyAdded(final int leaderNodeId) {
+ return leaderNodeId < -1;
+ }
+
+ ///////////////////////// Normal /////////////////////////
+
+ public ProgressIndex getProgressIndex() {
+ return progressIndex.get();
+ }
+
+ public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) {
+ return progressIndex.updateAndGet(
+ index ->
index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex));
}
public synchronized Iterable<PipeRuntimeException> getExceptionMessages() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
index c77c56fb9c0..9c1bc1b521c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
@@ -21,7 +21,8 @@ package org.apache.iotdb.commons.pipe.config.constant;
public class SystemConstant {
- public static final String RESTART_KEY = "__system.restart";
+ // This can be arbitrarily changed since it's only a memory key and not
stored
+ public static final String RESTART_OR_NEWLY_ADDED_KEY =
"__system.restart_or_newly_added";
public static final boolean RESTART_DEFAULT_VALUE = false;
private SystemConstant() {