This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fc2746e7a58 [IOTDB-5936] Pipe: correct the behaviour of the historical
data collector in realtime only mode (#9987)
fc2746e7a58 is described below
commit fc2746e7a58ea7b0713406e0e6fb311694bbf918
Author: yschengzi <[email protected]>
AuthorDate: Wed May 31 02:57:58 2023 +0800
[IOTDB-5936] Pipe: correct the behaviour of the historical data collector
in realtime only mode (#9987)
The historical data collector now also starts in the "realtime only" mode
because it needs to collect data created after the pipe is created when a
restart or a master switch event occurs. In the fixed logic, the historical
data collector adds logic to compare the data generation event and the pipe
creation event.
Now, when the historical data collector is created, it seals all tsfiles in
the data region to ensure that the generation time of the tsfile matches the
generation time of the pipe.
---------
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 2 +-
.../core/collector/IoTDBDataRegionCollector.java | 22 +++--
.../PipeHistoricalDataRegionTsFileCollector.java | 103 +++++++++++++++++----
.../org/apache/iotdb/db/pipe/task/PipeBuilder.java | 12 +--
.../apache/iotdb/db/pipe/task/PipeTaskBuilder.java | 46 +++------
.../db/pipe/task/stage/PipeTaskCollectorStage.java | 4 +-
6 files changed, 117 insertions(+), 72 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 10e8679ac83..d0596127340 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -510,7 +510,7 @@ public class PipeTaskAgent {
PipeTaskMeta pipeTaskMeta) {
if (pipeTaskMeta.getLeaderDataNodeId() == CONFIG.getDataNodeId()) {
final PipeTask pipeTask =
- new PipeTaskBuilder(consensusGroupId, pipeTaskMeta,
pipeStaticMeta).build();
+ new PipeTaskBuilder(pipeStaticMeta, consensusGroupId,
pipeTaskMeta).build();
pipeTask.create();
pipeTaskManager.addPipeTask(pipeStaticMeta, consensusGroupId, pipeTask);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
index b26c880b028..37a9c5101a9 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
import
org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionCollector;
-import
org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionFakeCollector;
import
org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionTsFileCollector;
import
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
import
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionFakeCollector;
@@ -58,8 +57,9 @@ public class IoTDBDataRegionCollector implements
PipeCollector {
private final AtomicBoolean hasBeenStarted;
- private final ListenableUnboundedBlockingPendingQueue<Event>
collectorPendingQueue;
private final PipeTaskMeta pipeTaskMeta;
+ private final long creationTime;
+ private final ListenableUnboundedBlockingPendingQueue<Event>
collectorPendingQueue;
// TODO: support pattern in historical collector
private PipeHistoricalDataRegionCollector historicalCollector;
@@ -69,15 +69,13 @@ public class IoTDBDataRegionCollector implements
PipeCollector {
public IoTDBDataRegionCollector(
PipeTaskMeta pipeTaskMeta,
+ long creationTime,
ListenableUnboundedBlockingPendingQueue<Event> collectorPendingQueue) {
- hasBeenStarted = new AtomicBoolean(false);
+ this.hasBeenStarted = new AtomicBoolean(false);
this.pipeTaskMeta = pipeTaskMeta;
+ this.creationTime = creationTime;
this.collectorPendingQueue = collectorPendingQueue;
-
- historicalCollector = new
PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta);
- realtimeCollector =
- new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta,
collectorPendingQueue);
}
@Override
@@ -119,8 +117,14 @@ public class IoTDBDataRegionCollector implements
PipeCollector {
// enable historical collector by default
historicalCollector =
parameters.getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, true)
- ? new PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta)
- : new PipeHistoricalDataRegionFakeCollector();
+ ? new PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta,
Long.MIN_VALUE)
+ // We define the realtime data as the data generated after the
creation time
+ // of the pipe from user's perspective. But we still need to use
+ // PipeHistoricalDataRegionCollector to collect the realtime data
generated between the
+ // creation time of the pipe and the time when the pipe starts,
because those data
+ // can not be listened by PipeRealtimeDataRegionCollector, and
should be collected by
+ // PipeHistoricalDataRegionCollector from implementation
perspective.
+ : new PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta,
creationTime);
}
private void constructRealtimeCollector(PipeParameters parameters) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index 90041ed1d59..c1157d96979 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -25,8 +25,8 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
@@ -34,49 +34,98 @@ import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.event.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.stream.Collectors;
+import static
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME;
+import static
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME;
+import static
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.DATA_REGION_KEY;
+
public class PipeHistoricalDataRegionTsFileCollector extends
PipeHistoricalDataRegionCollector {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileCollector.class);
+
private final PipeTaskMeta pipeTaskMeta;
private final ProgressIndex startIndex;
private int dataRegionId;
- private long collectStartTime;
- private long collectEndTime;
+ private final long historicalDataCollectionTimeLowerBound;
+ private long historicalDataCollectionStartTime;
+ private long historicalDataCollectionEndTime;
private Queue<PipeTsFileInsertionEvent> pendingQueue;
- public PipeHistoricalDataRegionTsFileCollector(PipeTaskMeta pipeTaskMeta) {
+ public PipeHistoricalDataRegionTsFileCollector(
+ PipeTaskMeta pipeTaskMeta, long historicalDataCollectionTimeLowerBound) {
this.pipeTaskMeta = pipeTaskMeta;
this.startIndex = pipeTaskMeta.getProgressIndex();
+
+ this.historicalDataCollectionTimeLowerBound =
historicalDataCollectionTimeLowerBound;
}
@Override
public void validate(PipeParameterValidator validator) throws Exception {
- validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
+ validator.validateRequiredAttribute(DATA_REGION_KEY);
}
@Override
public void customize(
PipeParameters parameters, PipeCollectorRuntimeConfiguration
configuration) {
- dataRegionId = parameters.getInt(PipeCollectorConstant.DATA_REGION_KEY);
- collectStartTime =
-
parameters.hasAttribute(PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME)
+ dataRegionId = parameters.getInt(DATA_REGION_KEY);
+ historicalDataCollectionStartTime =
+ parameters.hasAttribute(COLLECTOR_HISTORY_START_TIME)
? DateTimeUtils.convertDatetimeStrToLong(
-
parameters.getString(PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME),
- ZoneId.systemDefault())
+ parameters.getString(COLLECTOR_HISTORY_START_TIME),
ZoneId.systemDefault())
: Long.MIN_VALUE;
- collectEndTime =
-
parameters.hasAttribute(PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME)
+ historicalDataCollectionEndTime =
+ parameters.hasAttribute(COLLECTOR_HISTORY_END_TIME)
? DateTimeUtils.convertDatetimeStrToLong(
-
parameters.getString(PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME),
- ZoneId.systemDefault())
+ parameters.getString(COLLECTOR_HISTORY_END_TIME),
ZoneId.systemDefault())
: Long.MAX_VALUE;
+
+ // Only invoke flushDataRegionAllTsFiles() when the pipe runs in the
realtime only mode.
+ // realtime only mode -> (historicalDataCollectionTimeLowerBound !=
Long.MIN_VALUE)
+ //
+ // Ensure that all data in the data region is flushed to disk before
collecting data.
+ // This ensures the generation time of all newly generated TsFiles
(realtime data) after the
+ // invocation of flushDataRegionAllTsFiles() is later than the
creationTime of the pipe
+ // (historicalDataCollectionTimeLowerBound).
+ //
+ // Note that: the generation time of the TsFile is the time when the
TsFile is created, not
+ // the time when the data is flushed to the TsFile.
+ //
+ // Then we can use the generation time of the TsFile to determine whether
the data in the
+ // TsFile should be collected by comparing the generation time of the
TsFile with the
+ // historicalDataCollectionTimeLowerBound when starting the pipe in
realtime only mode.
+ //
+ // If we don't invoke flushDataRegionAllTsFiles() in the realtime only
mode, the data generated
+ // between the creation time of the pipe the time when the pipe starts
will be lost.
+ if (historicalDataCollectionTimeLowerBound != Long.MIN_VALUE) {
+ flushDataRegionAllTsFiles();
+ }
+ }
+
+ private void flushDataRegionAllTsFiles() {
+ final DataRegion dataRegion =
+ StorageEngine.getInstance().getDataRegion(new
DataRegionId(dataRegionId));
+ if (dataRegion == null) {
+ return;
+ }
+
+ dataRegion.writeLock("Pipe: create historical TsFile collector");
+ try {
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+ } finally {
+ dataRegion.writeUnlock();
+ }
}
@Override
@@ -88,7 +137,7 @@ public class PipeHistoricalDataRegionTsFileCollector extends
PipeHistoricalDataR
return;
}
- dataRegion.writeLock("Pipe: collect historical TsFile");
+ dataRegion.writeLock("Pipe: start to collect historical TsFile");
try {
dataRegion.syncCloseAllWorkingTsFileProcessors();
@@ -101,7 +150,8 @@ public class PipeHistoricalDataRegionTsFileCollector
extends PipeHistoricalDataR
.filter(
resource ->
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
- &&
isTsFileResourceOverlappedWithTimeRange(resource))
+ &&
isTsFileResourceOverlappedWithTimeRange(resource)
+ &&
isTsFileGeneratedAfterCollectionTimeLowerBound(resource))
.map(resource -> new PipeTsFileInsertionEvent(resource,
pipeTaskMeta))
.collect(Collectors.toList()));
pendingQueue.addAll(
@@ -109,7 +159,8 @@ public class PipeHistoricalDataRegionTsFileCollector
extends PipeHistoricalDataR
.filter(
resource ->
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
- &&
isTsFileResourceOverlappedWithTimeRange(resource))
+ &&
isTsFileResourceOverlappedWithTimeRange(resource)
+ &&
isTsFileGeneratedAfterCollectionTimeLowerBound(resource))
.map(resource -> new PipeTsFileInsertionEvent(resource,
pipeTaskMeta))
.collect(Collectors.toList()));
pendingQueue.forEach(
@@ -125,8 +176,22 @@ public class PipeHistoricalDataRegionTsFileCollector
extends PipeHistoricalDataR
}
private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource
resource) {
- return !(resource.getFileEndTime() < collectStartTime
- || collectEndTime < resource.getFileStartTime());
+ return !(resource.getFileEndTime() < historicalDataCollectionStartTime
+ || historicalDataCollectionEndTime < resource.getFileStartTime());
+ }
+
+ private boolean
isTsFileGeneratedAfterCollectionTimeLowerBound(TsFileResource resource) {
+ try {
+ return historicalDataCollectionTimeLowerBound
+ <=
TsFileNameGenerator.getTsFileName(resource.getTsFile().getName()).getTime();
+ } catch (IOException e) {
+ LOGGER.warn(
+ String.format("failed to get the generation time of TsFile %s",
resource.getTsFilePath()),
+ e);
+ // If failed to get the generation time of the TsFile, we will collect
the data in the TsFile
+ // anyway.
+ return true;
+ }
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
index 481eb0e5ad7..3bd469907f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import java.util.HashMap;
import java.util.Map;
@@ -42,10 +41,6 @@ public class PipeBuilder {
public Map<TConsensusGroupId, PipeTask> build() {
final PipeStaticMeta pipeStaticMeta = pipeMeta.getStaticMeta();
- final String pipeName = pipeStaticMeta.getPipeName();
- final PipeParameters collectorParameters =
pipeStaticMeta.getCollectorParameters();
- final PipeParameters processorParameters =
pipeStaticMeta.getProcessorParameters();
- final PipeParameters connectorParameters =
pipeStaticMeta.getConnectorParameters();
final Map<TConsensusGroupId, PipeTask> consensusGroupIdToPipeTaskMap = new
HashMap<>();
@@ -57,12 +52,9 @@ public class PipeBuilder {
consensusGroupIdToPipeTaskMap.put(
consensusGroupIdToPipeTaskMeta.getKey(),
new PipeTaskBuilder(
- pipeName,
+ pipeStaticMeta,
consensusGroupIdToPipeTaskMeta.getKey(),
- consensusGroupIdToPipeTaskMeta.getValue(),
- collectorParameters,
- processorParameters,
- connectorParameters)
+ consensusGroupIdToPipeTaskMeta.getValue())
.build());
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index 274b50e1a98..1886f489c41 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -25,41 +25,18 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.task.stage.PipeTaskCollectorStage;
import org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage;
import org.apache.iotdb.db.pipe.task.stage.PipeTaskProcessorStage;
-import org.apache.iotdb.pipe.api.customizer.PipeParameters;
public class PipeTaskBuilder {
- private final String pipeName;
+ private final PipeStaticMeta pipeStaticMeta;
private final TConsensusGroupId dataRegionId;
private final PipeTaskMeta pipeTaskMeta;
- private final PipeParameters pipeCollectorParameters;
- private final PipeParameters pipeProcessorParameters;
- private final PipeParameters pipeConnectorParameters;
- PipeTaskBuilder(
- String pipeName,
- TConsensusGroupId dataRegionId,
- PipeTaskMeta pipeTaskMeta,
- PipeParameters pipeCollectorParameters,
- PipeParameters pipeProcessorParameters,
- PipeParameters pipeConnectorParameters) {
- this.pipeName = pipeName;
+ public PipeTaskBuilder(
+ PipeStaticMeta pipeStaticMeta, TConsensusGroupId dataRegionId,
PipeTaskMeta pipeTaskMeta) {
+ this.pipeStaticMeta = pipeStaticMeta;
this.dataRegionId = dataRegionId;
this.pipeTaskMeta = pipeTaskMeta;
- this.pipeCollectorParameters = pipeCollectorParameters;
- this.pipeProcessorParameters = pipeProcessorParameters;
- this.pipeConnectorParameters = pipeConnectorParameters;
- }
-
- public PipeTaskBuilder(
- TConsensusGroupId dataRegionId, PipeTaskMeta pipeTaskMeta,
PipeStaticMeta pipeStaticMeta) {
- this(
- pipeStaticMeta.getPipeName(),
- dataRegionId,
- pipeTaskMeta,
- pipeStaticMeta.getCollectorParameters(),
- pipeStaticMeta.getProcessorParameters(),
- pipeStaticMeta.getConnectorParameters());
}
public PipeTask build() {
@@ -67,21 +44,26 @@ public class PipeTaskBuilder {
// we first build the collector and connector, then build the processor.
final PipeTaskCollectorStage collectorStage =
- new PipeTaskCollectorStage(dataRegionId, pipeTaskMeta,
pipeCollectorParameters);
+ new PipeTaskCollectorStage(
+ dataRegionId,
+ pipeTaskMeta,
+ pipeStaticMeta.getCreationTime(),
+ pipeStaticMeta.getCollectorParameters());
final PipeTaskConnectorStage connectorStage =
- new PipeTaskConnectorStage(pipeConnectorParameters, pipeTaskMeta);
+ new PipeTaskConnectorStage(pipeStaticMeta.getConnectorParameters(),
pipeTaskMeta);
// the processor connects the collector and connector.
final PipeTaskProcessorStage processorStage =
new PipeTaskProcessorStage(
- pipeName,
+ pipeStaticMeta.getPipeName(),
dataRegionId,
pipeTaskMeta,
collectorStage.getEventSupplier(),
collectorStage.getCollectorPendingQueue(),
- pipeProcessorParameters,
+ pipeStaticMeta.getProcessorParameters(),
connectorStage.getPipeConnectorPendingQueue());
- return new PipeTask(pipeName, dataRegionId, collectorStage,
processorStage, connectorStage);
+ return new PipeTask(
+ pipeStaticMeta.getPipeName(), dataRegionId, collectorStage,
processorStage, connectorStage);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 8146a3a3f28..60488c6d253 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -58,6 +58,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
public PipeTaskCollectorStage(
TConsensusGroupId dataRegionId,
PipeTaskMeta pipeTaskMeta,
+ long creationTime,
PipeParameters collectorParameters) {
// TODO: avoid if-else, use reflection to create collector all the time
if (collectorParameters
@@ -77,7 +78,8 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
.put(PipeCollectorConstant.DATA_REGION_KEY,
String.valueOf(dataRegionId.getId()));
collectorPendingQueue = new ListenableUnboundedBlockingPendingQueue<>();
- this.pipeCollector = new IoTDBDataRegionCollector(pipeTaskMeta,
collectorPendingQueue);
+ this.pipeCollector =
+ new IoTDBDataRegionCollector(pipeTaskMeta, creationTime,
collectorPendingQueue);
} else {
this.collectorParameters = collectorParameters;