This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 89af73d2088 Pipe: Smoothed the rate in pipe's remaining time
calculations (#12699)
89af73d2088 is described below
commit 89af73d2088fdde6adf7a48713cd1e7312cfe218
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 17 11:43:49 2024 +0800
Pipe: Smoothed the rate in pipe's remaining time calculations (#12699)
---
.../java/org/apache/iotdb/CountPointProcessor.java | 14 +-
.../pipe/agent/task/PipeConfigNodeTaskAgent.java | 10 ++
.../pipe/event/PipeConfigRegionSnapshotEvent.java | 13 +-
.../pipe/event/PipeConfigRegionWritePlanEvent.java | 8 +-
.../metric/PipeConfigNodeRemainingTimeMetrics.java | 34 +++--
.../PipeConfigNodeRemainingTimeOperator.java | 99 +++++++------
.../manager/pipe/task/PipeConfigNodeTaskStage.java | 16 +--
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 11 ++
.../db/pipe/event/UserDefinedEnrichedEvent.java | 23 +--
.../event/common/heartbeat/PipeHeartbeatEvent.java | 15 +-
.../db/pipe/event/common/row/PipeRowCollector.java | 1 +
.../schema/PipeSchemaRegionSnapshotEvent.java | 14 +-
.../schema/PipeSchemaRegionWritePlanEvent.java | 8 +-
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 66 +++++----
.../common/tablet/PipeRawTabletInsertionEvent.java | 75 +++++-----
.../event/common/terminate/PipeTerminateEvent.java | 10 +-
.../common/tsfile/PipeTsFileInsertionEvent.java | 52 +++----
.../tsfile/TsFileInsertionDataContainer.java | 2 +
.../db/pipe/event/realtime/PipeRealtimeEvent.java | 11 +-
.../PipeHistoricalDataRegionTsFileExtractor.java | 6 +-
.../realtime/PipeRealtimeDataRegionExtractor.java | 7 +-
.../realtime/assigner/PipeDataRegionAssigner.java | 2 +-
.../PipeDataNodeRemainingEventAndTimeMetrics.java | 46 +++---
.../PipeDataNodeRemainingEventAndTimeOperator.java | 158 ++++++++++++---------
.../twostage/plugin/TwoStageCountProcessor.java | 2 +-
.../pipe/task/connection/PipeEventCollector.java | 1 +
iotdb-core/node-commons/pom.xml | 4 +
.../apache/iotdb/commons/conf/CommonConfig.java | 25 +++-
.../iotdb/commons/conf/CommonDescriptor.java | 16 ++-
.../enums/PipeRemainingTimeRateAverageTime.java | 49 +++++++
.../commons/pipe/agent/task/PipeTaskAgent.java | 12 +-
.../iotdb/commons/pipe/config/PipeConfig.java | 15 +-
.../iotdb/commons/pipe/event/EnrichedEvent.java | 32 +++--
.../commons/pipe/event/PipeSnapshotEvent.java | 3 +-
.../commons/pipe/event/PipeWritePlanEvent.java | 3 +-
.../commons/pipe/event/ProgressReportEvent.java | 7 +-
.../extractor/IoTDBNonDataRegionExtractor.java | 11 +-
.../commons/pipe/metric/PipeRemainingOperator.java | 87 ++++++++++++
.../pipe/progress/PipeEventCommitManager.java | 43 +++---
39 files changed, 672 insertions(+), 339 deletions(-)
diff --git
a/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
b/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
index 838090e7a3a..1e13b27b3a3 100644
---
a/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
+++
b/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
@@ -44,26 +44,28 @@ public class CountPointProcessor implements PipeProcessor {
private PartialPath aggregateSeries;
@Override
- public void validate(PipeParameterValidator validator) {
+ public void validate(final PipeParameterValidator validator) {
validator.validateRequiredAttribute(AGGREGATE_SERIES_KEY);
}
@Override
- public void customize(PipeParameters parameters,
PipeProcessorRuntimeConfiguration configuration)
+ public void customize(
+ final PipeParameters parameters, final PipeProcessorRuntimeConfiguration
configuration)
throws Exception {
this.aggregateSeries = new
PartialPath(parameters.getString(AGGREGATE_SERIES_KEY));
}
@Override
- public void process(TabletInsertionEvent tabletInsertionEvent,
EventCollector eventCollector) {
+ public void process(
+ final TabletInsertionEvent tabletInsertionEvent, final EventCollector
eventCollector) {
tabletInsertionEvent.processTablet(
(tablet, rowCollector) -> writePointCount.addAndGet(tablet.rowSize));
}
@Override
- public void process(Event event, EventCollector eventCollector) throws
Exception {
+ public void process(final Event event, final EventCollector eventCollector)
throws Exception {
if (event instanceof PipeHeartbeatEvent) {
- Tablet tablet =
+ final Tablet tablet =
new Tablet(
aggregateSeries.getDevice(),
Collections.singletonList(
@@ -73,7 +75,7 @@ public class CountPointProcessor implements PipeProcessor {
tablet.addTimestamp(0, System.currentTimeMillis());
tablet.addValue(aggregateSeries.getMeasurement(), 0,
writePointCount.get());
eventCollector.collect(
- new PipeRawTabletInsertionEvent(tablet, false, null, null, null,
false));
+ new PipeRawTabletInsertionEvent(tablet, false, null, 0, null, null,
false));
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
index fb8507be9d4..abfea097dbf 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
@@ -64,6 +64,16 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
return PipeConfigNodeAgent.runtime().isShutdown();
}
+ @Override
+ protected void thawRate(final String pipeName, final long creationTime) {
+ PipeConfigNodeRemainingTimeMetrics.getInstance().thawRate(pipeName + "_" +
creationTime);
+ }
+
+ @Override
+ protected void freezeRate(final String pipeName, final long creationTime) {
+ PipeConfigNodeRemainingTimeMetrics.getInstance().freezeRate(pipeName + "_"
+ creationTime);
+ }
+
@Override
protected Map<Integer, PipeTask> buildPipeTasks(final PipeMeta
pipeMetaFromConfigNode)
throws IllegalPathException {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
index dcdc0f8f81d..56c9d012c3c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
@@ -90,7 +90,7 @@ public class PipeConfigRegionSnapshotEvent extends
PipeSnapshotEvent {
public PipeConfigRegionSnapshotEvent(
final String snapshotPath, final String templateFilePath, final
CNSnapshotFileType type) {
- this(snapshotPath, templateFilePath, type, null, null, null);
+ this(snapshotPath, templateFilePath, type, null, 0, null, null);
}
public PipeConfigRegionSnapshotEvent(
@@ -98,9 +98,15 @@ public class PipeConfigRegionSnapshotEvent extends
PipeSnapshotEvent {
final String templateFilePath,
final CNSnapshotFileType type,
final String pipeName,
+ final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern) {
- super(pipeName, pipeTaskMeta, pattern,
PipeConfigNodeSnapshotResourceManager.getInstance());
+ super(
+ pipeName,
+ creationTime,
+ pipeTaskMeta,
+ pattern,
+ PipeConfigNodeSnapshotResourceManager.getInstance());
this.snapshotPath = snapshotPath;
this.templateFilePath = Objects.nonNull(templateFilePath) ?
templateFilePath : "";
this.fileType = type;
@@ -157,12 +163,13 @@ public class PipeConfigRegionSnapshotEvent extends
PipeSnapshotEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
+ final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
return new PipeConfigRegionSnapshotEvent(
- snapshotPath, templateFilePath, fileType, pipeName, pipeTaskMeta,
pattern);
+ snapshotPath, templateFilePath, fileType, pipeName, creationTime,
pipeTaskMeta, pattern);
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
index c6996907460..0d780bafca1 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
@@ -41,16 +41,17 @@ public class PipeConfigRegionWritePlanEvent extends
PipeWritePlanEvent {
public PipeConfigRegionWritePlanEvent(
final ConfigPhysicalPlan configPhysicalPlan, final boolean
isGeneratedByPipe) {
- this(configPhysicalPlan, null, null, null, isGeneratedByPipe);
+ this(configPhysicalPlan, null, 0, null, null, isGeneratedByPipe);
}
public PipeConfigRegionWritePlanEvent(
final ConfigPhysicalPlan configPhysicalPlan,
final String pipeName,
+ final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final boolean isGeneratedByPipe) {
- super(pipeName, pipeTaskMeta, pattern, isGeneratedByPipe);
+ super(pipeName, creationTime, pipeTaskMeta, pattern, isGeneratedByPipe);
this.configPhysicalPlan = configPhysicalPlan;
}
@@ -61,12 +62,13 @@ public class PipeConfigRegionWritePlanEvent extends
PipeWritePlanEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
+ final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
return new PipeConfigRegionWritePlanEvent(
- configPhysicalPlan, pipeName, pipeTaskMeta, pattern, false);
+ configPhysicalPlan, pipeName, creationTime, pipeTaskMeta, pattern,
false);
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
index 378251eb45c..557e355596e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.confignode.manager.pipe.metric;
-import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -110,6 +109,26 @@ public class PipeConfigNodeRemainingTimeMetrics implements
IMetricSet {
}
}
+ public void thawRate(final String pipeID) {
+ if (!remainingTimeOperatorMap.containsKey(pipeID)) {
+ LOGGER.warn(
+ "Failed to thaw pipe remaining time rate, RemainingTimeOperator({})
does not exist",
+ pipeID);
+ return;
+ }
+ remainingTimeOperatorMap.get(pipeID).thawRate(true);
+ }
+
+ public void freezeRate(final String pipeID) {
+ if (!remainingTimeOperatorMap.containsKey(pipeID)) {
+ LOGGER.warn(
+ "Failed to freeze pipe remaining time rate,
RemainingTimeOperator({}) does not exist",
+ pipeID);
+ return;
+ }
+ remainingTimeOperatorMap.get(pipeID).freezeRate(true);
+ }
+
public void deregister(final String pipeID) {
if (!remainingTimeOperatorMap.containsKey(pipeID)) {
LOGGER.warn(
@@ -122,12 +141,7 @@ public class PipeConfigNodeRemainingTimeMetrics implements
IMetricSet {
}
}
- public void markRegionCommit(final PipeTaskRuntimeEnvironment
pipeTaskRuntimeEnvironment) {
- // Filter commit attempt from assigner
- final String pipeName = pipeTaskRuntimeEnvironment.getPipeName();
- final long creationTime = pipeTaskRuntimeEnvironment.getCreationTime();
- final String pipeID = pipeName + "_" + creationTime;
-
+ public void markRegionCommit(final String pipeID, final boolean
isDataRegion) {
if (Objects.isNull(metricService)) {
return;
}
@@ -137,12 +151,6 @@ public class PipeConfigNodeRemainingTimeMetrics implements
IMetricSet {
"Failed to mark pipe region commit, RemainingTimeOperator({}) does
not exist", pipeID);
return;
}
- // Prevent not set pipeName / creation times & potential differences
between pipeNames and
- // creation times
- if (!Objects.equals(pipeName, operator.getPipeName())
- || !Objects.equals(creationTime, operator.getCreationTime())) {
- return;
- }
operator.markConfigRegionCommit();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
index 201aa775de5..7a5044b0c61 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
@@ -19,77 +19,72 @@
package org.apache.iotdb.confignode.manager.pipe.metric;
+import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.metric.PipeRemainingOperator;
+import
org.apache.iotdb.confignode.manager.pipe.execution.PipeConfigNodeSubtask;
import
org.apache.iotdb.confignode.manager.pipe.extractor.IoTDBConfigRegionExtractor;
import com.codahale.metrics.Clock;
import com.codahale.metrics.ExponentialMovingAverages;
import com.codahale.metrics.Meter;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
-class PipeConfigNodeRemainingTimeOperator {
+class PipeConfigNodeRemainingTimeOperator extends PipeRemainingOperator {
- private static final long CONFIG_NODE_REMAINING_MAX_SECONDS = 365 * 24 * 60
* 60L; // 1 year
+ private final Set<IoTDBConfigRegionExtractor> configRegionExtractors =
+ Collections.newSetFromMap(new ConcurrentHashMap<>());
+ private final AtomicReference<Meter> configRegionCommitMeter = new
AtomicReference<>(null);
- private String pipeName;
- private long creationTime = 0;
-
- private final ConcurrentMap<IoTDBConfigRegionExtractor,
IoTDBConfigRegionExtractor>
- configRegionExtractors = new ConcurrentHashMap<>();
- private final Meter configRegionCommitMeter =
- new Meter(new ExponentialMovingAverages(), Clock.defaultClock());
-
- private double lastConfigRegionCommitSmoothingValue = Long.MIN_VALUE;
-
- //////////////////////////// Tags ////////////////////////////
-
- String getPipeName() {
- return pipeName;
- }
-
- long getCreationTime() {
- return creationTime;
- }
+ private double lastConfigRegionCommitSmoothingValue = Long.MAX_VALUE;
//////////////////////////// Remaining time calculation
////////////////////////////
/**
- * This will calculate the estimated remaining time of the given pipe's
config region subTask.
+ * This will calculate the estimated remaining time of the given pipe's
{@link
+ * PipeConfigNodeSubtask}.
*
* @return The estimated remaining time
*/
double getRemainingTime() {
- final double pipeRemainingTimeCommitRateSmoothingFactor =
-
PipeConfig.getInstance().getPipeRemainingTimeCommitRateSmoothingFactor();
+ final PipeRemainingTimeRateAverageTime
pipeRemainingTimeCommitRateAverageTime =
+ PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime();
// Do not calculate heartbeat event
final long totalConfigRegionWriteEventCount =
- configRegionExtractors.keySet().stream()
+ configRegionExtractors.stream()
.map(IoTDBConfigRegionExtractor::getUnTransferredEventCount)
.reduce(Long::sum)
.orElse(0L);
- lastConfigRegionCommitSmoothingValue =
- lastConfigRegionCommitSmoothingValue == Long.MIN_VALUE
- ? configRegionCommitMeter.getOneMinuteRate()
- : pipeRemainingTimeCommitRateSmoothingFactor
- * configRegionCommitMeter.getOneMinuteRate()
- + (1 - pipeRemainingTimeCommitRateSmoothingFactor)
- * lastConfigRegionCommitSmoothingValue;
+ configRegionCommitMeter.updateAndGet(
+ meter -> {
+ if (Objects.nonNull(meter)) {
+ lastConfigRegionCommitSmoothingValue =
+ pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter);
+ }
+ return meter;
+ });
+
final double configRegionRemainingTime;
if (totalConfigRegionWriteEventCount <= 0) {
+ notifyEmpty();
configRegionRemainingTime = 0;
} else {
+ notifyNonEmpty();
configRegionRemainingTime =
lastConfigRegionCommitSmoothingValue <= 0
? Double.MAX_VALUE
: totalConfigRegionWriteEventCount /
lastConfigRegionCommitSmoothingValue;
}
- return configRegionRemainingTime >= CONFIG_NODE_REMAINING_MAX_SECONDS
- ? CONFIG_NODE_REMAINING_MAX_SECONDS
+ return configRegionRemainingTime >= REMAINING_MAX_SECONDS
+ ? REMAINING_MAX_SECONDS
: configRegionRemainingTime;
}
@@ -97,17 +92,37 @@ class PipeConfigNodeRemainingTimeOperator {
void register(final IoTDBConfigRegionExtractor extractor) {
setNameAndCreationTime(extractor.getPipeName(),
extractor.getCreationTime());
- configRegionExtractors.put(extractor, extractor);
- }
-
- private void setNameAndCreationTime(final String pipeName, final long
creationTime) {
- this.pipeName = pipeName;
- this.creationTime = creationTime;
+ configRegionExtractors.add(extractor);
}
//////////////////////////// Rate ////////////////////////////
void markConfigRegionCommit() {
- configRegionCommitMeter.mark();
+ configRegionCommitMeter.updateAndGet(
+ meter -> {
+ if (Objects.nonNull(meter)) {
+ meter.mark();
+ }
+ return meter;
+ });
+ }
+
+ //////////////////////////// Switch ////////////////////////////
+
+ @Override
+ public synchronized void thawRate(final boolean isStartPipe) {
+ super.thawRate(isStartPipe);
+ // The stopped pipe's rate should only be thawed by "startPipe" command
+ if (isStopped) {
+ return;
+ }
+ configRegionCommitMeter.compareAndSet(
+ null, new Meter(new ExponentialMovingAverages(),
Clock.defaultClock()));
+ }
+
+ @Override
+ public synchronized void freezeRate(final boolean isStopPipe) {
+ super.freezeRate(isStopPipe);
+ configRegionCommitMeter.set(null);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeConfigNodeTaskStage.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeConfigNodeTaskStage.java
index 36eec65b69e..7e676903857 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeConfigNodeTaskStage.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeConfigNodeTaskStage.java
@@ -32,12 +32,12 @@ public class PipeConfigNodeTaskStage extends PipeTaskStage {
private final PipeConfigNodeSubtask subtask;
public PipeConfigNodeTaskStage(
- String pipeName,
- long creationTime,
- Map<String, String> extractorAttributes,
- Map<String, String> processorAttributes,
- Map<String, String> connectorAttributes,
- PipeTaskMeta pipeTaskMeta) {
+ final String pipeName,
+ final long creationTime,
+ final Map<String, String> extractorAttributes,
+ final Map<String, String> processorAttributes,
+ final Map<String, String> connectorAttributes,
+ final PipeTaskMeta pipeTaskMeta) {
try {
subtask =
@@ -48,7 +48,7 @@ public class PipeConfigNodeTaskStage extends PipeTaskStage {
processorAttributes,
connectorAttributes,
pipeTaskMeta);
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new PipeException(
String.format(
"Failed to create subtask for pipe %s, creation time %d",
pipeName, creationTime),
@@ -63,8 +63,6 @@ public class PipeConfigNodeTaskStage extends PipeTaskStage {
@Override
public void startSubtask() throws PipeException {
- // IoTDBConfigRegionExtractor is started by executor because starting
- // here may cause deadlock when triggering snapshot
PipeConfigNodeSubtaskExecutor.getInstance().start(subtask.getTaskID());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index a6abcbb9e9a..1c3862bb991 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -237,6 +237,17 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
});
}
+ @Override
+ protected void thawRate(final String pipeName, final long creationTime) {
+ PipeDataNodeRemainingEventAndTimeMetrics.getInstance().thawRate(pipeName +
"_" + creationTime);
+ }
+
+ @Override
+ protected void freezeRate(final String pipeName, final long creationTime) {
+ PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ .freezeRate(pipeName + "_" + creationTime);
+ }
+
@Override
protected boolean dropPipe(final String pipeName, final long creationTime) {
if (!super.dropPipe(pipeName, creationTime)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
index ff6e34fffcf..4803158db2e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
@@ -31,7 +31,7 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent {
private final UserDefinedEvent userDefinedEvent;
private final EnrichedEvent enrichedEvent;
- public static Event maybeOf(Event event) {
+ public static Event maybeOf(final Event event) {
return event instanceof UserDefinedEvent
&& ((UserDefinedEvent) event).getSourceEvent() instanceof
EnrichedEvent
? new UserDefinedEnrichedEvent(
@@ -39,9 +39,11 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent {
: event;
}
- private UserDefinedEnrichedEvent(UserDefinedEvent userDefinedEvent,
EnrichedEvent enrichedEvent) {
+ private UserDefinedEnrichedEvent(
+ final UserDefinedEvent userDefinedEvent, final EnrichedEvent
enrichedEvent) {
super(
enrichedEvent.getPipeName(),
+ enrichedEvent.getCreationTime(),
enrichedEvent.getPipeTaskMeta(),
enrichedEvent.getPipePattern(),
enrichedEvent.getStartTime(),
@@ -55,12 +57,12 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent
{
}
@Override
- public boolean internallyIncreaseResourceReferenceCount(String
holderMessage) {
+ public boolean internallyIncreaseResourceReferenceCount(final String
holderMessage) {
return
enrichedEvent.internallyIncreaseResourceReferenceCount(holderMessage);
}
@Override
- public boolean internallyDecreaseResourceReferenceCount(String
holderMessage) {
+ public boolean internallyDecreaseResourceReferenceCount(final String
holderMessage) {
return
enrichedEvent.internallyDecreaseResourceReferenceCount(holderMessage);
}
@@ -71,13 +73,14 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent
{
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- String pipeName,
- PipeTaskMeta pipeTaskMeta,
- PipePattern pattern,
- long startTime,
- long endTime) {
+ final String pipeName,
+ final long creationTime,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipePattern pattern,
+ final long startTime,
+ final long endTime) {
return enrichedEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- pipeName, pipeTaskMeta, pattern, startTime, endTime);
+ pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index 9738f86a5dd..f82d799b7d3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -38,7 +38,6 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeHeartbeatEvent.class);
private final String dataRegionId;
- private String pipeName;
private long timePublished;
private long timeAssigned;
@@ -60,18 +59,19 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
private final boolean shouldPrintMessage;
public PipeHeartbeatEvent(final String dataRegionId, final boolean
shouldPrintMessage) {
- super(null, null, null, Long.MIN_VALUE, Long.MAX_VALUE);
+ super(null, 0, null, null, Long.MIN_VALUE, Long.MAX_VALUE);
this.dataRegionId = dataRegionId;
this.shouldPrintMessage = shouldPrintMessage;
}
public PipeHeartbeatEvent(
final String pipeName,
+ final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final String dataRegionId,
final long timePublished,
final boolean shouldPrintMessage) {
- super(pipeName, pipeTaskMeta, null, Long.MIN_VALUE, Long.MAX_VALUE);
+ super(pipeName, creationTime, pipeTaskMeta, null, Long.MIN_VALUE,
Long.MAX_VALUE);
this.dataRegionId = dataRegionId;
this.timePublished = timePublished;
this.shouldPrintMessage = shouldPrintMessage;
@@ -100,6 +100,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
+ final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
@@ -107,7 +108,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
// Should record PipeTaskMeta, for sometimes HeartbeatEvents should report
exceptions.
// Here we ignore parameters `pattern`, `startTime`, and `endTime`.
return new PipeHeartbeatEvent(
- pipeName, pipeTaskMeta, dataRegionId, timePublished,
shouldPrintMessage);
+ pipeName, creationTime, pipeTaskMeta, dataRegionId, timePublished,
shouldPrintMessage);
}
@Override
@@ -128,12 +129,6 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
/////////////////////////////// Delay Reporting
///////////////////////////////
- public void bindPipeName(final String pipeName) {
- if (shouldPrintMessage) {
- this.pipeName = pipeName;
- }
- }
-
public void onPublished() {
if (shouldPrintMessage) {
timePublished = System.currentTimeMillis();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
index 1b747652e1c..cb2183b2000 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
@@ -105,6 +105,7 @@ public class PipeRowCollector implements RowCollector {
tablet,
isAligned,
sourceEvent == null ? null : sourceEvent.getPipeName(),
+ sourceEvent == null ? 0 : sourceEvent.getCreationTime(),
pipeTaskMeta,
sourceEvent,
false));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
index d647285bdc6..60bbf95feee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
@@ -78,7 +78,7 @@ public class PipeSchemaRegionSnapshotEvent extends
PipeSnapshotEvent {
public PipeSchemaRegionSnapshotEvent(
final String mTreeSnapshotPath, final String tagLogSnapshotPath, final
String databaseName) {
- this(mTreeSnapshotPath, tagLogSnapshotPath, databaseName, null, null,
null);
+ this(mTreeSnapshotPath, tagLogSnapshotPath, databaseName, null, 0, null,
null);
}
public PipeSchemaRegionSnapshotEvent(
@@ -86,9 +86,10 @@ public class PipeSchemaRegionSnapshotEvent extends
PipeSnapshotEvent {
final String tagLogSnapshotPath,
final String databaseName,
final String pipeName,
+ final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern) {
- super(pipeName, pipeTaskMeta, pattern, PipeResourceManager.snapshot());
+ super(pipeName, creationTime, pipeTaskMeta, pattern,
PipeResourceManager.snapshot());
this.mTreeSnapshotPath = mTreeSnapshotPath;
this.tagLogSnapshotPath = Objects.nonNull(tagLogSnapshotPath) ?
tagLogSnapshotPath : "";
this.databaseName = databaseName;
@@ -145,12 +146,19 @@ public class PipeSchemaRegionSnapshotEvent extends
PipeSnapshotEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
+ final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
return new PipeSchemaRegionSnapshotEvent(
- mTreeSnapshotPath, tagLogSnapshotPath, databaseName, pipeName,
pipeTaskMeta, pattern);
+ mTreeSnapshotPath,
+ tagLogSnapshotPath,
+ databaseName,
+ pipeName,
+ creationTime,
+ pipeTaskMeta,
+ pattern);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
index 6e0b5cccf86..d6eefe086e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
@@ -40,16 +40,17 @@ public class PipeSchemaRegionWritePlanEvent extends
PipeWritePlanEvent {
}
public PipeSchemaRegionWritePlanEvent(final PlanNode planNode, final boolean
isGeneratedByPipe) {
- this(planNode, null, null, null, isGeneratedByPipe);
+ this(planNode, null, 0, null, null, isGeneratedByPipe);
}
public PipeSchemaRegionWritePlanEvent(
final PlanNode planNode,
final String pipeName,
+ final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final boolean isGeneratedByPipe) {
- super(pipeName, pipeTaskMeta, pattern, isGeneratedByPipe);
+ super(pipeName, creationTime, pipeTaskMeta, pattern, isGeneratedByPipe);
this.planNode = planNode;
}
@@ -60,12 +61,13 @@ public class PipeSchemaRegionWritePlanEvent extends
PipeWritePlanEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
+ final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
return new PipeSchemaRegionWritePlanEvent(
- planNode, pipeName, pipeTaskMeta, pattern, isGeneratedByPipe);
+ planNode, pipeName, creationTime, pipeTaskMeta, pattern,
isGeneratedByPipe);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 11711fa1d03..b9ddfd0d55d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -68,11 +68,11 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
private ProgressIndex progressIndex;
public PipeInsertNodeTabletInsertionEvent(
- WALEntryHandler walEntryHandler,
- PartialPath devicePath,
- ProgressIndex progressIndex,
- boolean isAligned,
- boolean isGeneratedByPipe) {
+ final WALEntryHandler walEntryHandler,
+ final PartialPath devicePath,
+ final ProgressIndex progressIndex,
+ final boolean isAligned,
+ final boolean isGeneratedByPipe) {
this(
walEntryHandler,
devicePath,
@@ -80,6 +80,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
isAligned,
isGeneratedByPipe,
null,
+ 0,
null,
null,
Long.MIN_VALUE,
@@ -87,17 +88,18 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
}
private PipeInsertNodeTabletInsertionEvent(
- WALEntryHandler walEntryHandler,
- PartialPath devicePath,
- ProgressIndex progressIndex,
- boolean isAligned,
- boolean isGeneratedByPipe,
- String pipeName,
- PipeTaskMeta pipeTaskMeta,
- PipePattern pattern,
- long startTime,
- long endTime) {
- super(pipeName, pipeTaskMeta, pattern, startTime, endTime);
+ final WALEntryHandler walEntryHandler,
+ final PartialPath devicePath,
+ final ProgressIndex progressIndex,
+ final boolean isAligned,
+ final boolean isGeneratedByPipe,
+ final String pipeName,
+ final long creationTime,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipePattern pattern,
+ final long startTime,
+ final long endTime) {
+ super(pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
this.walEntryHandler = walEntryHandler;
// Record device path here so there's no need to get it from InsertNode
cache later.
this.devicePath = devicePath;
@@ -129,11 +131,11 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
/////////////////////////// EnrichedEvent ///////////////////////////
@Override
- public boolean internallyIncreaseResourceReferenceCount(String
holderMessage) {
+ public boolean internallyIncreaseResourceReferenceCount(final String
holderMessage) {
try {
PipeResourceManager.wal().pin(walEntryHandler);
return true;
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn(
String.format(
"Increase reference count for memtable %d error. Holder Message:
%s",
@@ -144,7 +146,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
}
@Override
- public boolean internallyDecreaseResourceReferenceCount(String
holderMessage) {
+ public boolean internallyDecreaseResourceReferenceCount(final String
holderMessage) {
try {
PipeResourceManager.wal().unpin(walEntryHandler);
// Release the containers' memory.
@@ -153,7 +155,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
dataContainers = null;
}
return true;
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn(
String.format(
"Decrease reference count for memtable %d error. Holder Message:
%s",
@@ -164,7 +166,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
}
@Override
- public void bindProgressIndex(ProgressIndex progressIndex) {
+ public void bindProgressIndex(final ProgressIndex progressIndex) {
this.progressIndex = progressIndex;
}
@@ -175,11 +177,12 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
@Override
public PipeInsertNodeTabletInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- String pipeName,
- PipeTaskMeta pipeTaskMeta,
- PipePattern pattern,
- long startTime,
- long endTime) {
+ final String pipeName,
+ final long creationTime,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipePattern pattern,
+ final long startTime,
+ final long endTime) {
return new PipeInsertNodeTabletInsertionEvent(
walEntryHandler,
devicePath,
@@ -187,6 +190,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
isAligned,
isGeneratedByPipe,
pipeName,
+ creationTime,
pipeTaskMeta,
pattern,
startTime,
@@ -224,7 +228,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
throw new UnSupportedDataTypeException(
String.format("InsertNode type %s is not supported.",
insertNode.getClass().getName()));
}
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn(
"Exception occurred when determining the event time of
PipeInsertNodeTabletInsertionEvent({}) overlaps with the time range: [{}, {}].
Returning true to ensure data integrity.",
this,
@@ -238,7 +242,8 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
/////////////////////////// TabletInsertionEvent ///////////////////////////
@Override
- public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row,
RowCollector> consumer) {
+ public Iterable<TabletInsertionEvent> processRowByRow(
+ final BiConsumer<Row, RowCollector> consumer) {
return initDataContainers().stream()
.map(tabletInsertionDataContainer ->
tabletInsertionDataContainer.processRowByRow(consumer))
.flatMap(Collection::stream)
@@ -246,7 +251,8 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
}
@Override
- public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet,
RowCollector> consumer) {
+ public Iterable<TabletInsertionEvent> processTablet(
+ final BiConsumer<Tablet, RowCollector> consumer) {
return initDataContainers().stream()
.map(tabletInsertionDataContainer ->
tabletInsertionDataContainer.processTablet(consumer))
.flatMap(Collection::stream)
@@ -334,7 +340,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
.map(
tablet ->
new PipeRawTabletInsertionEvent(
- tablet, isAligned, pipeName, pipeTaskMeta, this,
false))
+ tablet, isAligned, pipeName, creationTime,
pipeTaskMeta, this, false))
.filter(event -> !event.hasNoNeedParsingAndIsEmpty())
.collect(Collectors.toList());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index e792f580981..239f73b3c32 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -52,16 +52,17 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
private ProgressIndex overridingProgressIndex;
private PipeRawTabletInsertionEvent(
- Tablet tablet,
- boolean isAligned,
- EnrichedEvent sourceEvent,
- boolean needToReport,
- String pipeName,
- PipeTaskMeta pipeTaskMeta,
- PipePattern pattern,
- long startTime,
- long endTime) {
- super(pipeName, pipeTaskMeta, pattern, startTime, endTime);
+ final Tablet tablet,
+ final boolean isAligned,
+ final EnrichedEvent sourceEvent,
+ final boolean needToReport,
+ final String pipeName,
+ final long creationTime,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipePattern pattern,
+ final long startTime,
+ final long endTime) {
+ super(pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
this.tablet = Objects.requireNonNull(tablet);
this.isAligned = isAligned;
this.sourceEvent = sourceEvent;
@@ -69,18 +70,20 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
}
public PipeRawTabletInsertionEvent(
- Tablet tablet,
- boolean isAligned,
- String pipeName,
- PipeTaskMeta pipeTaskMeta,
- EnrichedEvent sourceEvent,
- boolean needToReport) {
+ final Tablet tablet,
+ final boolean isAligned,
+ final String pipeName,
+ final long creationTime,
+ final PipeTaskMeta pipeTaskMeta,
+ final EnrichedEvent sourceEvent,
+ final boolean needToReport) {
this(
tablet,
isAligned,
sourceEvent,
needToReport,
pipeName,
+ creationTime,
pipeTaskMeta,
null,
Long.MIN_VALUE,
@@ -88,28 +91,30 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
}
@TestOnly
- public PipeRawTabletInsertionEvent(Tablet tablet, boolean isAligned) {
- this(tablet, isAligned, null, false, null, null, null, Long.MIN_VALUE,
Long.MAX_VALUE);
+ public PipeRawTabletInsertionEvent(final Tablet tablet, final boolean
isAligned) {
+ this(tablet, isAligned, null, false, null, 0, null, null, Long.MIN_VALUE,
Long.MAX_VALUE);
}
@TestOnly
- public PipeRawTabletInsertionEvent(Tablet tablet, boolean isAligned,
PipePattern pattern) {
- this(tablet, isAligned, null, false, null, null, pattern, Long.MIN_VALUE,
Long.MAX_VALUE);
+ public PipeRawTabletInsertionEvent(
+ final Tablet tablet, final boolean isAligned, final PipePattern pattern)
{
+ this(tablet, isAligned, null, false, null, 0, null, pattern,
Long.MIN_VALUE, Long.MAX_VALUE);
}
@TestOnly
- public PipeRawTabletInsertionEvent(Tablet tablet, long startTime, long
endTime) {
- this(tablet, false, null, false, null, null, null, startTime, endTime);
+ public PipeRawTabletInsertionEvent(
+ final Tablet tablet, final long startTime, final long endTime) {
+ this(tablet, false, null, false, null, 0, null, null, startTime, endTime);
}
@Override
- public boolean internallyIncreaseResourceReferenceCount(String
holderMessage) {
+ public boolean internallyIncreaseResourceReferenceCount(final String
holderMessage) {
allocatedMemoryBlock =
PipeResourceManager.memory().forceAllocateWithRetry(tablet);
return true;
}
@Override
- public boolean internallyDecreaseResourceReferenceCount(String
holderMessage) {
+ public boolean internallyDecreaseResourceReferenceCount(final String
holderMessage) {
allocatedMemoryBlock.close();
// Record the deviceId before the memory is released,
@@ -130,7 +135,7 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
}
@Override
- public void bindProgressIndex(ProgressIndex overridingProgressIndex) {
+ public void bindProgressIndex(final ProgressIndex overridingProgressIndex) {
// Normally not all events need to report progress, but if the
overridingProgressIndex
// is given, indicating that the progress needs to be reported.
if (Objects.nonNull(overridingProgressIndex)) {
@@ -152,17 +157,19 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- String pipeName,
- PipeTaskMeta pipeTaskMeta,
- PipePattern pattern,
- long startTime,
- long endTime) {
+ final String pipeName,
+ final long creationTime,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipePattern pattern,
+ final long startTime,
+ final long endTime) {
return new PipeRawTabletInsertionEvent(
tablet,
isAligned,
sourceEvent,
needToReport,
pipeName,
+ creationTime,
pipeTaskMeta,
pattern,
startTime,
@@ -196,7 +203,8 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
/////////////////////////// TabletInsertionEvent ///////////////////////////
@Override
- public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row,
RowCollector> consumer) {
+ public Iterable<TabletInsertionEvent> processRowByRow(
+ final BiConsumer<Row, RowCollector> consumer) {
if (dataContainer == null) {
dataContainer =
new TabletInsertionDataContainer(pipeTaskMeta, this, tablet,
isAligned, pipePattern);
@@ -205,7 +213,8 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
}
@Override
- public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet,
RowCollector> consumer) {
+ public Iterable<TabletInsertionEvent> processTablet(
+ final BiConsumer<Tablet, RowCollector> consumer) {
if (dataContainer == null) {
dataContainer =
new TabletInsertionDataContainer(pipeTaskMeta, this, tablet,
isAligned, pipePattern);
@@ -241,7 +250,7 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
public PipeRawTabletInsertionEvent parseEventWithPatternOrTime() {
return new PipeRawTabletInsertionEvent(
- convertToTablet(), isAligned, pipeName, pipeTaskMeta, this,
needToReport);
+ convertToTablet(), isAligned, pipeName, creationTime, pipeTaskMeta,
this, needToReport);
}
public boolean hasNoNeedParsingAndIsEmpty() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
index c1252fe668f..cc423f1bf3d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -38,8 +38,11 @@ public class PipeTerminateEvent extends EnrichedEvent {
private final int dataRegionId;
public PipeTerminateEvent(
- final String pipeName, final PipeTaskMeta pipeTaskMeta, final int
dataRegionId) {
- super(pipeName, pipeTaskMeta, null, Long.MIN_VALUE, Long.MAX_VALUE);
+ final String pipeName,
+ final long creationTime,
+ final PipeTaskMeta pipeTaskMeta,
+ final int dataRegionId) {
+ super(pipeName, creationTime, pipeTaskMeta, null, Long.MIN_VALUE,
Long.MAX_VALUE);
this.dataRegionId = dataRegionId;
}
@@ -61,13 +64,14 @@ public class PipeTerminateEvent extends EnrichedEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
+ final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
// Should record PipeTaskMeta, for the terminateEvent shall report
progress to
// notify the pipeTask it's completed.
- return new PipeTerminateEvent(pipeName, pipeTaskMeta, dataRegionId);
+ return new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta,
dataRegionId);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 55739bf8f32..a7c0cc5c255 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -59,7 +59,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
private TsFileInsertionDataContainer dataContainer;
public PipeTsFileInsertionEvent(
- TsFileResource resource, boolean isLoaded, boolean isGeneratedByPipe) {
+ final TsFileResource resource, final boolean isLoaded, final boolean
isGeneratedByPipe) {
// The modFile must be copied before the event is assigned to the
listening pipes
this(
resource,
@@ -67,6 +67,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
isLoaded,
isGeneratedByPipe,
null,
+ 0,
null,
null,
Long.MIN_VALUE,
@@ -74,16 +75,17 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
}
public PipeTsFileInsertionEvent(
- TsFileResource resource,
- boolean isWithMod,
- boolean isLoaded,
- boolean isGeneratedByPipe,
- String pipeName,
- PipeTaskMeta pipeTaskMeta,
- PipePattern pattern,
- long startTime,
- long endTime) {
- super(pipeName, pipeTaskMeta, pattern, startTime, endTime);
+ final TsFileResource resource,
+ final boolean isWithMod,
+ final boolean isLoaded,
+ final boolean isGeneratedByPipe,
+ final String pipeName,
+ final long creationTime,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipePattern pattern,
+ final long startTime,
+ final long endTime) {
+ super(pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
this.resource = resource;
tsFile = resource.getTsFile();
@@ -186,14 +188,14 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
/////////////////////////// EnrichedEvent ///////////////////////////
@Override
- public boolean internallyIncreaseResourceReferenceCount(String
holderMessage) {
+ public boolean internallyIncreaseResourceReferenceCount(final String
holderMessage) {
try {
tsFile = PipeResourceManager.tsfile().increaseFileReference(tsFile,
true, resource);
if (isWithMod) {
modFile = PipeResourceManager.tsfile().increaseFileReference(modFile,
false, null);
}
return true;
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn(
String.format(
"Increase reference count for TsFile %s or modFile %s error.
Holder Message: %s",
@@ -204,14 +206,14 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
}
@Override
- public boolean internallyDecreaseResourceReferenceCount(String
holderMessage) {
+ public boolean internallyDecreaseResourceReferenceCount(final String
holderMessage) {
try {
PipeResourceManager.tsfile().decreaseFileReference(tsFile);
if (isWithMod) {
PipeResourceManager.tsfile().decreaseFileReference(modFile);
}
return true;
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn(
String.format(
"Decrease reference count for TsFile %s error. Holder Message:
%s",
@@ -231,7 +233,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
return MinimumProgressIndex.INSTANCE;
}
return resource.getMaxProgressIndexAfterClose();
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
LOGGER.warn(
String.format(
"Interrupted when waiting for closing TsFile %s.",
resource.getTsFilePath()));
@@ -242,17 +244,19 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
@Override
public PipeTsFileInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- String pipeName,
- PipeTaskMeta pipeTaskMeta,
- PipePattern pattern,
- long startTime,
- long endTime) {
+ final String pipeName,
+ final long creationTime,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipePattern pattern,
+ final long startTime,
+ final long endTime) {
return new PipeTsFileInsertionEvent(
resource,
isWithMod,
isLoaded,
isGeneratedByPipe,
pipeName,
+ creationTime,
pipeTaskMeta,
pattern,
startTime,
@@ -305,7 +309,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
return Collections.emptyList();
}
return initDataContainer().toTabletInsertionEvents();
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
Thread.currentThread().interrupt();
close();
@@ -325,7 +329,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
tsFile, pipePattern, startTime, endTime, pipeTaskMeta, this);
}
return dataContainer;
- } catch (IOException e) {
+ } catch (final IOException e) {
close();
final String errorMsg = String.format("Read TsFile %s error.",
resource.getTsFilePath());
@@ -334,7 +338,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
}
}
- public long count(boolean skipReportOnCommit) throws IOException {
+ public long count(final boolean skipReportOnCommit) throws IOException {
long count = 0;
if (shouldParseTime()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index 36dfc3df458..03e61cf2809 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -252,6 +252,7 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
tablet,
isAligned,
sourceEvent != null ? sourceEvent.getPipeName() : null,
+ sourceEvent != null ? sourceEvent.getCreationTime() : 0,
pipeTaskMeta,
sourceEvent,
true);
@@ -262,6 +263,7 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
tablet,
isAligned,
sourceEvent != null ? sourceEvent.getPipeName() : null,
+ sourceEvent != null ? sourceEvent.getCreationTime() : 0,
pipeTaskMeta,
sourceEvent,
false);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index 77df1f0292b..ce8e7d9f0fa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -58,7 +58,13 @@ public class PipeRealtimeEvent extends EnrichedEvent {
// PipeTaskMeta is used to report the progress of the event, the
PipeRealtimeEvent
// is only used in the realtime event extractor, which does not need to
report the progress
// of the event, so the pipeTaskMeta is always null.
- super(event != null ? event.getPipeName() : null, pipeTaskMeta, pattern,
startTime, endTime);
+ super(
+ event != null ? event.getPipeName() : null,
+ event != null ? event.getCreationTime() : 0,
+ pipeTaskMeta,
+ pattern,
+ startTime,
+ endTime);
this.event = event;
this.tsFileEpoch = tsFileEpoch;
@@ -140,13 +146,14 @@ public class PipeRealtimeEvent extends EnrichedEvent {
@Override
public PipeRealtimeEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
+ final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
return new PipeRealtimeEvent(
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- pipeName, pipeTaskMeta, pattern, startTime, endTime),
+ pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime),
this.tsFileEpoch,
this.device2Measurements,
pipeTaskMeta,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index b1e2649e6db..237dace7c02 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -94,6 +94,8 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
private static final long PIPE_MIN_FLUSH_INTERVAL_IN_MS = 2000;
private String pipeName;
+ private long creationTime;
+
private PipeTaskMeta pipeTaskMeta;
private ProgressIndex startIndex;
@@ -250,6 +252,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
(PipeTaskExtractorRuntimeEnvironment)
configuration.getRuntimeEnvironment();
pipeName = environment.getPipeName();
+ creationTime = environment.getCreationTime();
pipeTaskMeta = environment.getPipeTaskMeta();
startIndex = environment.getPipeTaskMeta().getProgressIndex();
@@ -563,7 +566,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
if (resource == null) {
isTerminateSignalSent = true;
final PipeTerminateEvent terminateEvent =
- new PipeTerminateEvent(pipeName, pipeTaskMeta, dataRegionId);
+ new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta,
dataRegionId);
terminateEvent.increaseReferenceCount(
PipeHistoricalDataRegionTsFileExtractor.class.getName());
return terminateEvent;
@@ -576,6 +579,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
false,
false,
pipeName,
+ creationTime,
pipeTaskMeta,
pipePattern,
historicalDataExtractionStartTime,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index c4bec4246e4..c26db376da0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -74,6 +74,7 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
LoggerFactory.getLogger(PipeRealtimeDataRegionExtractor.class);
protected String pipeName;
+ protected long creationTime;
protected String dataRegionId;
protected PipeTaskMeta pipeTaskMeta;
@@ -167,7 +168,7 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
// indexed by the taskID of IoTDBDataRegionExtractor. To avoid
PipeRealtimeDataRegionExtractor
// holding a reference to IoTDBDataRegionExtractor, the taskID should be
constructed to
// match that of IoTDBDataRegionExtractor.
- final long creationTime = environment.getCreationTime();
+ creationTime = environment.getCreationTime();
taskID = pipeName + "_" + dataRegionId + "_" + creationTime;
pipePattern = PipePattern.parsePipePatternFromSourceParameters(parameters);
@@ -387,6 +388,10 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
return pipeName;
}
+ public final long getCreationTime() {
+ return creationTime;
+ }
+
public final PipeTaskMeta getPipeTaskMeta() {
return pipeTaskMeta;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index e85fd5b0a1d..16f9ebf61d9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -76,6 +76,7 @@ public class PipeDataRegionAssigner implements Closeable {
final PipeRealtimeEvent copiedEvent =
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
extractor.getPipeName(),
+ extractor.getCreationTime(),
extractor.getPipeTaskMeta(),
extractor.getPipePattern(),
extractor.getRealtimeDataExtractionStartTime(),
@@ -90,7 +91,6 @@ public class PipeDataRegionAssigner implements Closeable {
extractor.extract(copiedEvent);
if (innerEvent instanceof PipeHeartbeatEvent) {
- ((PipeHeartbeatEvent)
innerEvent).bindPipeName(extractor.getPipeName());
((PipeHeartbeatEvent) innerEvent).onAssigned();
}
});
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
index c1ecb8460b4..8f9008f5d9a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
@@ -19,17 +19,12 @@
package org.apache.iotdb.db.pipe.metric;
-import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.consensus.SchemaRegionId;
-import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
-import org.apache.iotdb.db.schemaengine.SchemaEngine;
-import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -151,6 +146,26 @@ public class PipeDataNodeRemainingEventAndTimeMetrics
implements IMetricSet {
}
}
+ public void thawRate(final String pipeID) {
+ if (!remainingEventAndTimeOperatorMap.containsKey(pipeID)) {
+ // In dataNode, the "thawRate" may be called when there are no subtasks,
and we call
+ // "startPipe".
+ // We thaw it later in "startPipeTask".
+ return;
+ }
+ remainingEventAndTimeOperatorMap.get(pipeID).thawRate(true);
+ }
+
+ public void freezeRate(final String pipeID) {
+ if (!remainingEventAndTimeOperatorMap.containsKey(pipeID)) {
+ // In dataNode, the "freezeRate" may be called when there are no
subtasks, and we call
+ // "stopPipe" after calling "startPipe".
+ // We do nothing because in that case the rate is not thawed initially
+ return;
+ }
+ remainingEventAndTimeOperatorMap.get(pipeID).freezeRate(true);
+ }
+
public void deregister(final String pipeID) {
if (!remainingEventAndTimeOperatorMap.containsKey(pipeID)) {
LOGGER.warn(
@@ -163,13 +178,7 @@ public class PipeDataNodeRemainingEventAndTimeMetrics
implements IMetricSet {
}
}
- public void markRegionCommit(final PipeTaskRuntimeEnvironment
pipeTaskRuntimeEnvironment) {
- // Filter commit attempt from assigner
- final String pipeName = pipeTaskRuntimeEnvironment.getPipeName();
- final int regionId = pipeTaskRuntimeEnvironment.getRegionId();
- final long creationTime = pipeTaskRuntimeEnvironment.getCreationTime();
- final String pipeID = pipeName + "_" + creationTime;
-
+ public void markRegionCommit(final String pipeID, final boolean
isDataRegion) {
if (Objects.isNull(metricService)) {
return;
}
@@ -181,19 +190,10 @@ public class PipeDataNodeRemainingEventAndTimeMetrics
implements IMetricSet {
pipeID);
return;
}
- // Prevent not set pipeName / creation times & potential differences
between pipeNames and
- // creation times
- if (!Objects.equals(pipeName, operator.getPipeName())
- || !Objects.equals(creationTime, operator.getCreationTime())) {
- return;
- }
- // Prevent empty region-ids
- if (StorageEngine.getInstance().getAllDataRegionIds().contains(new
DataRegionId(regionId))) {
+ if (isDataRegion) {
operator.markDataRegionCommit();
- }
-
- if (SchemaEngine.getInstance().getAllSchemaRegionIds().contains(new
SchemaRegionId(regionId))) {
+ } else {
operator.markSchemaRegionCommit();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
index f7a08295f40..dcab93d535c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -19,61 +19,49 @@
package org.apache.iotdb.db.pipe.metric;
+import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.metric.PipeRemainingOperator;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
+import org.apache.iotdb.pipe.api.event.Event;
import com.codahale.metrics.Clock;
import com.codahale.metrics.ExponentialMovingAverages;
import com.codahale.metrics.Meter;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
-class PipeDataNodeRemainingEventAndTimeOperator {
+class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator {
+ private final Set<IoTDBDataRegionExtractor> dataRegionExtractors =
+ Collections.newSetFromMap(new ConcurrentHashMap<>());
+ private final Set<PipeConnectorSubtask> dataRegionConnectors =
+ Collections.newSetFromMap(new ConcurrentHashMap<>());
+ private final Set<IoTDBSchemaRegionExtractor> schemaRegionExtractors =
+ Collections.newSetFromMap(new ConcurrentHashMap<>());
+ private final AtomicReference<Meter> dataRegionCommitMeter = new
AtomicReference<>(null);
+ private final AtomicReference<Meter> schemaRegionCommitMeter = new
AtomicReference<>(null);
- private static final long DATA_NODE_REMAINING_MAX_SECONDS = 365 * 24 * 60 *
60L; // 1 year
-
- private String pipeName;
- private long creationTime = 0;
-
- private final ConcurrentMap<IoTDBDataRegionExtractor,
IoTDBDataRegionExtractor>
- dataRegionExtractors = new ConcurrentHashMap<>();
- private final ConcurrentMap<PipeConnectorSubtask, PipeConnectorSubtask>
dataRegionConnectors =
- new ConcurrentHashMap<>();
- private final ConcurrentMap<IoTDBSchemaRegionExtractor,
IoTDBSchemaRegionExtractor>
- schemaRegionExtractors = new ConcurrentHashMap<>();
- private final Meter dataRegionCommitMeter =
- new Meter(new ExponentialMovingAverages(), Clock.defaultClock());
- private final Meter schemaRegionCommitMeter =
- new Meter(new ExponentialMovingAverages(), Clock.defaultClock());
-
- private double lastDataRegionCommitSmoothingValue = Long.MIN_VALUE;
- private double lastSchemaRegionCommitSmoothingValue = Long.MIN_VALUE;
-
- //////////////////////////// Tags ////////////////////////////
-
- String getPipeName() {
- return pipeName;
- }
-
- long getCreationTime() {
- return creationTime;
- }
+ private double lastDataRegionCommitSmoothingValue = Long.MAX_VALUE;
+ private double lastSchemaRegionCommitSmoothingValue = Long.MAX_VALUE;
//////////////////////////// Remaining event & time calculation
////////////////////////////
long getRemainingEvents() {
- return dataRegionExtractors.keySet().stream()
+ return dataRegionExtractors.stream()
.map(IoTDBDataRegionExtractor::getEventCount)
.reduce(Integer::sum)
.orElse(0)
- + dataRegionConnectors.keySet().stream()
+ + dataRegionConnectors.stream()
.map(connectorSubtask -> connectorSubtask.getEventCount(pipeName))
.reduce(Integer::sum)
.orElse(0)
- + schemaRegionExtractors.keySet().stream()
+ + schemaRegionExtractors.stream()
.map(IoTDBSchemaRegionExtractor::getUnTransferredEventCount)
.reduce(Long::sum)
.orElse(0L);
@@ -82,39 +70,41 @@ class PipeDataNodeRemainingEventAndTimeOperator {
/**
* This will calculate the estimated remaining time of pipe.
*
- * <p>Note: The events in pipe assigner are omitted.
+ * <p>Note: The {@link Event}s in pipe assigner are omitted.
*
* @return The estimated remaining time
*/
double getRemainingTime() {
- final double pipeRemainingTimeCommitRateSmoothingFactor =
-
PipeConfig.getInstance().getPipeRemainingTimeCommitRateSmoothingFactor();
+ final PipeRemainingTimeRateAverageTime
pipeRemainingTimeCommitRateAverageTime =
+ PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime();
// Do not take heartbeat event into account
final int totalDataRegionWriteEventCount =
- dataRegionExtractors.keySet().stream()
+ dataRegionExtractors.stream()
.map(IoTDBDataRegionExtractor::getEventCount)
.reduce(Integer::sum)
.orElse(0)
- + dataRegionConnectors.keySet().stream()
+ + dataRegionConnectors.stream()
.map(connectorSubtask ->
connectorSubtask.getEventCount(pipeName))
.reduce(Integer::sum)
.orElse(0)
- - dataRegionExtractors.keySet().stream()
+ - dataRegionExtractors.stream()
.map(IoTDBDataRegionExtractor::getPipeHeartbeatEventCount)
.reduce(Integer::sum)
.orElse(0)
- - dataRegionConnectors.keySet().stream()
+ - dataRegionConnectors.stream()
.map(PipeConnectorSubtask::getPipeHeartbeatEventCount)
.reduce(Integer::sum)
.orElse(0);
- lastDataRegionCommitSmoothingValue =
- lastDataRegionCommitSmoothingValue == Long.MIN_VALUE
- ? dataRegionCommitMeter.getOneMinuteRate()
- : pipeRemainingTimeCommitRateSmoothingFactor *
dataRegionCommitMeter.getOneMinuteRate()
- + (1 - pipeRemainingTimeCommitRateSmoothingFactor)
- * lastDataRegionCommitSmoothingValue;
+ dataRegionCommitMeter.updateAndGet(
+ meter -> {
+ if (Objects.nonNull(meter)) {
+ lastDataRegionCommitSmoothingValue =
+ pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter);
+ }
+ return meter;
+ });
final double dataRegionRemainingTime;
if (totalDataRegionWriteEventCount <= 0) {
dataRegionRemainingTime = 0;
@@ -126,18 +116,19 @@ class PipeDataNodeRemainingEventAndTimeOperator {
}
final long totalSchemaRegionWriteEventCount =
- schemaRegionExtractors.keySet().stream()
+ schemaRegionExtractors.stream()
.map(IoTDBSchemaRegionExtractor::getUnTransferredEventCount)
.reduce(Long::sum)
.orElse(0L);
- lastSchemaRegionCommitSmoothingValue =
- lastSchemaRegionCommitSmoothingValue == Long.MIN_VALUE
- ? schemaRegionCommitMeter.getOneMinuteRate()
- : pipeRemainingTimeCommitRateSmoothingFactor
- * schemaRegionCommitMeter.getOneMinuteRate()
- + (1 - pipeRemainingTimeCommitRateSmoothingFactor)
- * lastSchemaRegionCommitSmoothingValue;
+ schemaRegionCommitMeter.updateAndGet(
+ meter -> {
+ if (Objects.nonNull(meter)) {
+ lastSchemaRegionCommitSmoothingValue =
+ pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter);
+ }
+ return meter;
+ });
final double schemaRegionRemainingTime;
if (totalSchemaRegionWriteEventCount <= 0) {
schemaRegionRemainingTime = 0;
@@ -148,40 +139,77 @@ class PipeDataNodeRemainingEventAndTimeOperator {
: totalSchemaRegionWriteEventCount /
lastSchemaRegionCommitSmoothingValue;
}
+ if (totalDataRegionWriteEventCount + totalSchemaRegionWriteEventCount ==
0) {
+ notifyEmpty();
+ } else {
+ notifyNonEmpty();
+ }
+
final double result = Math.max(dataRegionRemainingTime,
schemaRegionRemainingTime);
- return result >= DATA_NODE_REMAINING_MAX_SECONDS ?
DATA_NODE_REMAINING_MAX_SECONDS : result;
+ return result >= REMAINING_MAX_SECONDS ? REMAINING_MAX_SECONDS : result;
}
//////////////////////////// Register & deregister (pipe integration)
////////////////////////////
void register(final IoTDBDataRegionExtractor extractor) {
setNameAndCreationTime(extractor.getPipeName(),
extractor.getCreationTime());
- dataRegionExtractors.put(extractor, extractor);
+ dataRegionExtractors.add(extractor);
}
void register(
final PipeConnectorSubtask connectorSubtask, final String pipeName,
final long creationTime) {
setNameAndCreationTime(pipeName, creationTime);
- dataRegionConnectors.put(connectorSubtask, connectorSubtask);
+ dataRegionConnectors.add(connectorSubtask);
}
void register(final IoTDBSchemaRegionExtractor extractor) {
setNameAndCreationTime(extractor.getPipeName(),
extractor.getCreationTime());
- schemaRegionExtractors.put(extractor, extractor);
- }
-
- private void setNameAndCreationTime(final String pipeName, final long
creationTime) {
- this.pipeName = pipeName;
- this.creationTime = creationTime;
+ schemaRegionExtractors.add(extractor);
}
//////////////////////////// Rate ////////////////////////////
void markDataRegionCommit() {
- dataRegionCommitMeter.mark();
+ dataRegionCommitMeter.updateAndGet(
+ meter -> {
+ if (Objects.nonNull(meter)) {
+ meter.mark();
+ }
+ return meter;
+ });
}
void markSchemaRegionCommit() {
- schemaRegionCommitMeter.mark();
+ schemaRegionCommitMeter.updateAndGet(
+ meter -> {
+ if (Objects.nonNull(meter)) {
+ meter.mark();
+ }
+ return meter;
+ });
+ }
+
+ //////////////////////////// Switch ////////////////////////////
+
+ // Thread-safe & Idempotent
+ @Override
+ public synchronized void thawRate(final boolean isStartPipe) {
+ super.thawRate(isStartPipe);
+ // The stopped pipe's rate should only be thawed by "startPipe" command
+ if (isStopped) {
+ return;
+ }
+ dataRegionCommitMeter.compareAndSet(
+ null, new Meter(new ExponentialMovingAverages(),
Clock.defaultClock()));
+ schemaRegionCommitMeter.compareAndSet(
+ null, new Meter(new ExponentialMovingAverages(),
Clock.defaultClock()));
+ }
+
+ // Thread-safe & Idempotent
+ @Override
+ public synchronized void freezeRate(final boolean isStopPipe) {
+ super.freezeRate(isStopPipe);
+ dataRegionCommitMeter.set(null);
+ schemaRegionCommitMeter.set(null);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
index 2017051866f..07f952277aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
@@ -253,7 +253,7 @@ public class TwoStageCountProcessor implements
PipeProcessor {
tablet.addValue(outputSeries.getMeasurement(), 0,
timestampCountPair.right);
eventCollector.collect(
- new PipeRawTabletInsertionEvent(tablet, false, null, null, null,
false));
+ new PipeRawTabletInsertionEvent(tablet, false, null, 0, null, null,
false));
PipeCombineHandlerManager.getInstance()
.updateLastCombinedValue(pipeName, creationTime, timestampCountPair);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index 3055568aadf..fd673fe9b95 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -142,6 +142,7 @@ public class PipeEventCollector implements EventCollector {
new PipeSchemaRegionWritePlanEvent(
planNode,
deleteDataEvent.getPipeName(),
+ deleteDataEvent.getCreationTime(),
deleteDataEvent.getPipeTaskMeta(),
deleteDataEvent.getPipePattern(),
deleteDataEvent.isGeneratedByPipe()))
diff --git a/iotdb-core/node-commons/pom.xml b/iotdb-core/node-commons/pom.xml
index 5e5a1d251dc..e75e207d030 100644
--- a/iotdb-core/node-commons/pom.xml
+++ b/iotdb-core/node-commons/pom.xml
@@ -95,6 +95,10 @@
<artifactId>metrics-core</artifactId>
<version>1.3.3-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 1de7f33df3a..c21f853b8a7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.conf;
import
org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
+import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
@@ -235,7 +236,9 @@ public class CommonConfig {
private float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
private long pipeListeningQueueTransferSnapshotThreshold = 1000;
private int pipeSnapshotExecutionMaxBatchSize = 1000;
- private double pipeRemainingTimeCommitRateSmoothingFactor = 0.5;
+ private long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
+ private PipeRemainingTimeRateAverageTime
pipeRemainingTimeCommitRateAverageTime =
+ PipeRemainingTimeRateAverageTime.MEAN;
private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000L; // 8
minutes
private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L;
// 3 minutes
@@ -1020,13 +1023,23 @@ public class CommonConfig {
this.pipeSnapshotExecutionMaxBatchSize = pipeSnapshotExecutionMaxBatchSize;
}
- public double getPipeRemainingTimeCommitRateSmoothingFactor() {
- return pipeRemainingTimeCommitRateSmoothingFactor;
+ public long getPipeRemainingTimeCommitRateAutoSwitchSeconds() {
+ return pipeRemainingTimeCommitRateAutoSwitchSeconds;
}
- public void setPipeRemainingTimeCommitRateSmoothingFactor(
- double pipeRemainingTimeCommitRateSmoothingFactor) {
- this.pipeRemainingTimeCommitRateSmoothingFactor =
pipeRemainingTimeCommitRateSmoothingFactor;
+ public void setPipeRemainingTimeCommitRateAutoSwitchSeconds(
+ long pipeRemainingTimeCommitRateAutoSwitchSeconds) {
+ this.pipeRemainingTimeCommitRateAutoSwitchSeconds =
+ pipeRemainingTimeCommitRateAutoSwitchSeconds;
+ }
+
+ public PipeRemainingTimeRateAverageTime
getPipeRemainingTimeCommitRateAverageTime() {
+ return pipeRemainingTimeCommitRateAverageTime;
+ }
+
+ public void setPipeRemainingTimeCommitRateAverageTime(
+ PipeRemainingTimeRateAverageTime pipeRemainingTimeCommitRateAverageTime)
{
+ this.pipeRemainingTimeCommitRateAverageTime =
pipeRemainingTimeCommitRateAverageTime;
}
public double getPipeAllSinksRateLimitBytesPerSecond() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index f72ea5e528d..67718e1224c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.commons.conf;
import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
+import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
@@ -541,11 +542,18 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_snapshot_execution_max_batch_size",
String.valueOf(config.getPipeSnapshotExecutionMaxBatchSize()))));
- config.setPipeRemainingTimeCommitRateSmoothingFactor(
- Double.parseDouble(
+ config.setPipeRemainingTimeCommitRateAutoSwitchSeconds(
+ Long.parseLong(
properties.getProperty(
- "pipe_remaining_time_commit_rate_smoothing_factor",
-
String.valueOf(config.getPipeRemainingTimeCommitRateSmoothingFactor()))));
+ "pipe_remaining_time_commit_rate_auto_switch_seconds",
+
String.valueOf(config.getPipeRemainingTimeCommitRateAutoSwitchSeconds()))));
+ config.setPipeRemainingTimeCommitRateAverageTime(
+ PipeRemainingTimeRateAverageTime.valueOf(
+ properties
+ .getProperty(
+ "pipe_remaining_time_commit_rate_average_time",
+
String.valueOf(config.getPipeRemainingTimeCommitRateAverageTime()))
+ .trim()));
config.setTwoStageAggregateMaxCombinerLiveTimeInMs(
Long.parseLong(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRemainingTimeRateAverageTime.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRemainingTimeRateAverageTime.java
new file mode 100644
index 00000000000..25f799d09f6
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRemainingTimeRateAverageTime.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.enums;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+
+import com.codahale.metrics.Meter;
+
+public enum PipeRemainingTimeRateAverageTime {
+ ONE_MINUTE,
+ FIVE_MINUTES,
+ FIFTEEN_MINUTES,
+ MEAN;
+
+ public double getMeterRate(final Meter meter) {
+ switch (this) {
+ case ONE_MINUTE:
+ return meter.getOneMinuteRate();
+ case FIVE_MINUTES:
+ return meter.getFiveMinuteRate();
+ case FIFTEEN_MINUTES:
+ return meter.getFifteenMinuteRate();
+ case MEAN:
+ return meter.getMeanRate();
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "The type %s is not supported in average time of pipe
remaining time.",
+
PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime()));
+ }
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 65c1a95aaa3..63dd14c46c9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -272,6 +272,9 @@ public abstract class PipeTaskAgent {
break;
case STOPPED:
if (Objects.requireNonNull(statusInAgent) == PipeStatus.RUNNING) {
+ // Only freeze rate for user stopped pipes
+ // Freeze first to get better results in calculation
+ freezeRate(pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime());
stopPipe(pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime());
} else {
throw new IllegalStateException(
@@ -290,6 +293,10 @@ public abstract class PipeTaskAgent {
}
}
+ protected abstract void thawRate(final String pipeName, final long
creationTime);
+
+ protected abstract void freezeRate(final String pipeName, final long
creationTime);
+
public TPushPipeMetaRespExceptionMessage handleDropPipe(final String
pipeName) {
acquireWriteLock();
try {
@@ -576,9 +583,11 @@ public abstract class PipeTaskAgent {
.getConsensusGroupId2TaskMetaMap()
.values()
.forEach(PipeTaskMeta::clearExceptionMessages);
+
+ thawRate(pipeName, creationTime);
}
- protected void stopPipe(final String pipeName, final long creationTime) {
+ private void stopPipe(final String pipeName, final long creationTime) {
final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
if (!checkBeforeStopPipe(existedPipeMeta, pipeName, creationTime)) {
@@ -856,6 +865,7 @@ public abstract class PipeTaskAgent {
final PipeTask pipeTask = pipeTaskManager.getPipeTask(pipeStaticMeta,
consensusGroupId);
if (pipeTask != null) {
pipeTask.start();
+ thawRate(pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime());
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 48debcce408..5fa4aa8a812 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.config;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -139,8 +140,12 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeSnapshotExecutionMaxBatchSize();
}
- public double getPipeRemainingTimeCommitRateSmoothingFactor() {
- return COMMON_CONFIG.getPipeRemainingTimeCommitRateSmoothingFactor();
+ public long getPipeRemainingTimeCommitAutoSwitchSeconds() {
+ return COMMON_CONFIG.getPipeRemainingTimeCommitRateAutoSwitchSeconds();
+ }
+
+ public PipeRemainingTimeRateAverageTime
getPipeRemainingTimeCommitRateAverageTime() {
+ return COMMON_CONFIG.getPipeRemainingTimeCommitRateAverageTime();
}
/////////////////////////////// Meta Consistency
///////////////////////////////
@@ -328,8 +333,10 @@ public class PipeConfig {
getPipeListeningQueueTransferSnapshotThreshold());
LOGGER.info("PipeSnapshotExecutionMaxBatchSize: {}",
getPipeSnapshotExecutionMaxBatchSize());
LOGGER.info(
- "PipeRemainingTimeCommitRateSmoothingFactor: {}",
- getPipeRemainingTimeCommitRateSmoothingFactor());
+ "PipeRemainingTimeCommitAutoSwitchSeconds: {}",
+ getPipeRemainingTimeCommitAutoSwitchSeconds());
+ LOGGER.info(
+ "PipeRemainingTimeCommitRateAverageTime: {}",
getPipeRemainingTimeCommitRateAverageTime());
LOGGER.info("PipeAsyncConnectorSelectorNumber: {}",
getPipeAsyncConnectorSelectorNumber());
LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}",
getPipeAsyncConnectorMaxClientNumber());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index b43e37788cd..8de596a5447 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -48,6 +48,7 @@ public abstract class EnrichedEvent implements Event {
protected final AtomicBoolean isReleased;
protected final String pipeName;
+ protected final long creationTime;
protected final PipeTaskMeta pipeTaskMeta;
protected String committerKey;
@@ -67,6 +68,7 @@ public abstract class EnrichedEvent implements Event {
protected EnrichedEvent(
final String pipeName,
+ final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pipePattern,
final long startTime,
@@ -74,6 +76,7 @@ public abstract class EnrichedEvent implements Event {
referenceCount = new AtomicInteger(0);
isReleased = new AtomicBoolean(false);
this.pipeName = pipeName;
+ this.creationTime = creationTime;
this.pipeTaskMeta = pipeTaskMeta;
this.pipePattern = pipePattern;
this.startTime = startTime;
@@ -129,7 +132,7 @@ public abstract class EnrichedEvent implements Event {
* {@code false} if the {@link EnrichedEvent} is not controlled by the
invoker, which means
* the data stored in the event is not safe to use
*/
- public abstract boolean internallyIncreaseResourceReferenceCount(String
holderMessage);
+ public abstract boolean internallyIncreaseResourceReferenceCount(final
String holderMessage);
/**
* Decrease the {@link EnrichedEvent#referenceCount} of this {@link
EnrichedEvent} by 1. If the
@@ -208,7 +211,7 @@ public abstract class EnrichedEvent implements Event {
* @return {@code true} if the {@link EnrichedEvent#referenceCount} is
decreased successfully,
* {@code true} otherwise
*/
- public abstract boolean internallyDecreaseResourceReferenceCount(String
holderMessage);
+ public abstract boolean internallyDecreaseResourceReferenceCount(final
String holderMessage);
protected void reportProgress() {
if (pipeTaskMeta != null) {
@@ -245,6 +248,14 @@ public abstract class EnrichedEvent implements Event {
return pipeName;
}
+ public final long getCreationTime() {
+ return creationTime;
+ }
+
+ public final boolean isDataRegionEvent() {
+ return !(this instanceof PipeWritePlanEvent) && !(this instanceof
PipeSnapshotEvent);
+ }
+
/**
* Get the pattern string of this {@link EnrichedEvent}.
*
@@ -291,11 +302,12 @@ public abstract class EnrichedEvent implements Event {
}
public abstract EnrichedEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- String pipeName,
- PipeTaskMeta pipeTaskMeta,
- PipePattern pattern,
- long startTime,
- long endTime);
+ final String pipeName,
+ final long creationTime,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipePattern pattern,
+ final long startTime,
+ final long endTime);
public PipeTaskMeta getPipeTaskMeta() {
return pipeTaskMeta;
@@ -315,7 +327,7 @@ public abstract class EnrichedEvent implements Event {
this.commitId = commitId;
}
- public void setRebootTimes(int rebootTimes) {
+ public void setRebootTimes(final int rebootTimes) {
this.rebootTimes = rebootTimes;
}
@@ -345,14 +357,14 @@ public abstract class EnrichedEvent implements Event {
* Used for pipeConsensus. In PipeConsensus, we only need committerKey,
commitId and rebootTimes
* to uniquely identify an event
*/
- public boolean equalsInPipeConsensus(Object o) {
+ public boolean equalsInPipeConsensus(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
- EnrichedEvent otherEvent = (EnrichedEvent) o;
+ final EnrichedEvent otherEvent = (EnrichedEvent) o;
return Objects.equals(committerKey, otherEvent.committerKey)
&& commitId == otherEvent.commitId
&& rebootTimes == otherEvent.rebootTimes;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
index fa26319e6dc..f130a83c0ee 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
@@ -35,10 +35,11 @@ public abstract class PipeSnapshotEvent extends
EnrichedEvent implements Seriali
protected PipeSnapshotEvent(
final String pipeName,
+ final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final PipeSnapshotResourceManager resourceManager) {
- super(pipeName, pipeTaskMeta, pattern, Long.MIN_VALUE, Long.MAX_VALUE);
+ super(pipeName, creationTime, pipeTaskMeta, pattern, Long.MIN_VALUE,
Long.MAX_VALUE);
this.resourceManager = resourceManager;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
index d37aee82aaa..c553e6ce996 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
@@ -31,10 +31,11 @@ public abstract class PipeWritePlanEvent extends
EnrichedEvent implements Serial
protected PipeWritePlanEvent(
final String pipeName,
+ final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final boolean isGeneratedByPipe) {
- super(pipeName, pipeTaskMeta, pattern, Long.MIN_VALUE, Long.MAX_VALUE);
+ super(pipeName, creationTime, pipeTaskMeta, pattern, Long.MIN_VALUE,
Long.MAX_VALUE);
this.isGeneratedByPipe = isGeneratedByPipe;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
index 2851b03593c..4f00c89c5e6 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
@@ -34,11 +34,12 @@ public class ProgressReportEvent extends EnrichedEvent {
public ProgressReportEvent(
final String pipeName,
+ final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pipePattern,
final long startTime,
final long endTime) {
- super(pipeName, pipeTaskMeta, pipePattern, startTime, endTime);
+ super(pipeName, creationTime, pipeTaskMeta, pipePattern, startTime,
endTime);
}
@Override
@@ -64,11 +65,13 @@ public class ProgressReportEvent extends EnrichedEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
+ final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
- return new ProgressReportEvent(pipeName, pipeTaskMeta, pattern, startTime,
endTime);
+ return new ProgressReportEvent(
+ pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java
index 9d6ce0ffde8..a67e818be98 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java
@@ -154,7 +154,12 @@ public abstract class IoTDBNonDataRegionExtractor extends
IoTDBExtractor {
historicalEvents
.remove(0)
.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- pipeName, pipeTaskMeta, pipePattern, Long.MIN_VALUE,
Long.MAX_VALUE);
+ pipeName,
+ creationTime,
+ pipeTaskMeta,
+ pipePattern,
+ Long.MIN_VALUE,
+ Long.MAX_VALUE);
if (historicalEvents.isEmpty()) {
// We only report progress for the last snapshot event.
@@ -180,7 +185,7 @@ public abstract class IoTDBNonDataRegionExtractor extends
IoTDBExtractor {
|| (!isForwardingPipeRequests && realtimeEvent.isGeneratedByPipe())) {
final ProgressReportEvent event =
new ProgressReportEvent(
- pipeName, pipeTaskMeta, pipePattern, Long.MIN_VALUE,
Long.MAX_VALUE);
+ pipeName, creationTime, pipeTaskMeta, pipePattern,
Long.MIN_VALUE, Long.MAX_VALUE);
event.bindProgressIndex(new MetaProgressIndex(iterator.getNextIndex() -
1));
event.increaseReferenceCount(IoTDBNonDataRegionExtractor.class.getName());
return event;
@@ -189,7 +194,7 @@ public abstract class IoTDBNonDataRegionExtractor extends
IoTDBExtractor {
realtimeEvent =
(PipeWritePlanEvent)
realtimeEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- pipeName, pipeTaskMeta, pipePattern, Long.MIN_VALUE,
Long.MAX_VALUE);
+ pipeName, creationTime, pipeTaskMeta, pipePattern,
Long.MIN_VALUE, Long.MAX_VALUE);
realtimeEvent.bindProgressIndex(new
MetaProgressIndex(iterator.getNextIndex() - 1));
realtimeEvent.increaseReferenceCount(IoTDBNonDataRegionExtractor.class.getName());
return realtimeEvent;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeRemainingOperator.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeRemainingOperator.java
new file mode 100644
index 00000000000..97f80e5d985
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeRemainingOperator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.metric;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+
+public abstract class PipeRemainingOperator {
+
+ protected static final long REMAINING_MAX_SECONDS = 365 * 24 * 60 * 60L; //
1 year
+
+ protected String pipeName;
+ protected long creationTime = 0;
+
+ private long lastEmptyTimeStamp = System.currentTimeMillis();
+ private long lastNonEmptyTimeStamp = System.currentTimeMillis();
+ protected boolean isStopped = true;
+
+ //////////////////////////// Tags ////////////////////////////
+
+ public String getPipeName() {
+ return pipeName;
+ }
+
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ //////////////////////////// Register & deregister (pipe integration)
////////////////////////////
+
+ protected void setNameAndCreationTime(final String pipeName, final long
creationTime) {
+ this.pipeName = pipeName;
+ this.creationTime = creationTime;
+ }
+
+ //////////////////////////// Switch ////////////////////////////
+
+ protected void notifyNonEmpty() {
+ final long pipeRemainingTimeCommitAutoSwitchSeconds =
+ PipeConfig.getInstance().getPipeRemainingTimeCommitAutoSwitchSeconds();
+
+ lastNonEmptyTimeStamp = System.currentTimeMillis();
+ if (lastNonEmptyTimeStamp - lastEmptyTimeStamp
+ >= pipeRemainingTimeCommitAutoSwitchSeconds * 1000) {
+ thawRate(false);
+ }
+ }
+
+ protected void notifyEmpty() {
+ final long pipeRemainingTimeCommitAutoSwitchSeconds =
+ PipeConfig.getInstance().getPipeRemainingTimeCommitAutoSwitchSeconds();
+
+ lastEmptyTimeStamp = System.currentTimeMillis();
+ if (lastEmptyTimeStamp - lastNonEmptyTimeStamp
+ >= pipeRemainingTimeCommitAutoSwitchSeconds * 1000) {
+ freezeRate(false);
+ }
+ }
+
+ public synchronized void thawRate(final boolean isStartPipe) {
+ if (isStartPipe) {
+ isStopped = false;
+ }
+ }
+
+ public synchronized void freezeRate(final boolean isStopPipe) {
+ if (isStopPipe) {
+ isStopped = true;
+ }
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
index 0b740f7e12a..d51f0126612 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.commons.pipe.progress;
-import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.metric.PipeEventCommitMetrics;
@@ -29,7 +28,7 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
public class PipeEventCommitManager {
@@ -38,7 +37,7 @@ public class PipeEventCommitManager {
// key: pipeName_regionId
private final Map<String, PipeEventCommitter> eventCommitterMap = new
ConcurrentHashMap<>();
- private Consumer<PipeTaskRuntimeEnvironment> commitRateMarker;
+ private BiConsumer<String, Boolean> commitRateMarker;
public void register(
final String pipeName,
@@ -88,10 +87,26 @@ public class PipeEventCommitManager {
}
public void commit(final EnrichedEvent event, final String committerKey) {
- if (committerKey == null
- || event == null
+ if (event == null
|| !event.needToCommit()
- || event.getCommitId() <= EnrichedEvent.NO_COMMIT_ID) {
+ || Objects.isNull(event.getPipeName())
+ || event.getCreationTime() == 0) {
+ return;
+ }
+ if (Objects.nonNull(commitRateMarker)) {
+ try {
+ commitRateMarker.accept(
+ event.getPipeName() + '_' + event.getCreationTime(),
event.isDataRegionEvent());
+ } catch (final Exception e) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Failed to mark commit rate for pipe task: {}, stack trace: {}",
+ committerKey,
+ Thread.currentThread().getStackTrace());
+ }
+ }
+ }
+ if (committerKey == null || event.getCommitId() <=
EnrichedEvent.NO_COMMIT_ID) {
return;
}
final PipeEventCommitter committer = eventCommitterMap.get(committerKey);
@@ -108,20 +123,6 @@ public class PipeEventCommitManager {
}
committer.commit(event);
- if (Objects.nonNull(commitRateMarker)) {
- try {
- commitRateMarker.accept(
- new PipeTaskRuntimeEnvironment(
- committer.getPipeName(), committer.getCreationTime(),
committer.getRegionId()));
- } catch (Exception e) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "Failed to mark commit rate for pipe: {}, stack trace: {}",
- committerKey,
- Thread.currentThread().getStackTrace());
- }
- }
- }
}
private static String generateCommitterKey(
@@ -129,7 +130,7 @@ public class PipeEventCommitManager {
return String.format("%s_%s_%s", pipeName, regionId, creationTime);
}
- public void setCommitRateMarker(final Consumer<PipeTaskRuntimeEnvironment>
commitRateMarker) {
+ public void setCommitRateMarker(final BiConsumer<String, Boolean>
commitRateMarker) {
this.commitRateMarker = commitRateMarker;
}