This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 84780e2d3f3 [IOTDB-5723] Pipe: progress index (#9446)(#9950)
84780e2d3f3 is described below
commit 84780e2d3f318c58ae24dd7358051f9296b1c5fe
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sun May 28 07:56:57 2023 +0800
[IOTDB-5723] Pipe: progress index (#9446)(#9950)
Co-authored-by: yschengzi <[email protected]>
Co-authored-by: Xiangpeng Hu <[email protected]>
---
.../pipe/runtime/PipeRuntimeCoordinator.java | 5 +-
.../confignode/persistence/pipe/PipeTaskInfo.java | 9 +-
.../runtime/PipeHandleMetaChangeProcedure.java | 10 +-
.../impl/pipe/task/CreatePipeProcedureV2.java | 7 +-
.../request/ConfigPhysicalPlanSerDeTest.java | 9 +-
.../iotdb/confignode/persistence/PipeInfoTest.java | 3 +-
.../runtime/PipeHandleMetaChangeProcedureTest.java | 7 +-
.../consensus/iot/IoTConsensusServerImpl.java | 7 +
.../index/ComparableConsensusRequest.java | 26 ++++
.../commons/consensus/index/ProgressIndex.java | 79 ++++++++++
.../commons/consensus/index/ProgressIndexType.java | 79 ++++++++++
.../consensus/index/impl/IoTProgressIndex.java | 167 ++++++++++++++++++++
.../consensus/index/impl/MinimumProgressIndex.java | 87 +++++++++++
.../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 67 ++++----
.../commons/pipe/task/meta/PipeMetaDeSerTest.java | 7 +-
.../IoTConsensusDataRegionStateMachine.java | 11 +-
.../db/engine/storagegroup/TsFileProcessor.java | 6 +
.../db/engine/storagegroup/TsFileResource.java | 58 +++++++
.../plan/node/write/InsertMultiTabletsNode.java | 7 +
.../plan/planner/plan/node/write/InsertNode.java | 20 ++-
.../planner/plan/node/write/InsertRowsNode.java | 7 +
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 7 +
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 41 +++--
.../core/collector/IoTDBDataRegionCollector.java | 9 +-
.../PipeHistoricalDataRegionTsFileCollector.java | 22 ++-
.../realtime/PipeRealtimeDataRegionCollector.java | 11 ++
.../PipeRealtimeDataRegionHybridCollector.java | 4 +-
.../realtime/assigner/PipeDataRegionAssigner.java | 7 +-
.../iotdb/db/pipe/core/event/EnrichedEvent.java | 76 ++++++++-
.../core/event/impl/PipeTabletInsertionEvent.java | 69 +++++---
.../core/event/impl/PipeTsFileInsertionEvent.java | 71 +++++++--
.../event/realtime/PipeRealtimeCollectEvent.java | 49 ++++--
.../realtime/PipeRealtimeCollectEventFactory.java | 4 +-
.../org/apache/iotdb/db/pipe/task/PipeBuilder.java | 26 ++--
.../org/apache/iotdb/db/pipe/task/PipeTask.java | 7 +-
.../apache/iotdb/db/pipe/task/PipeTaskBuilder.java | 15 +-
.../db/pipe/task/stage/PipeTaskCollectorStage.java | 11 +-
.../db/pipe/task/stage/PipeTaskProcessorStage.java | 3 +-
.../TsFileResourceProgressIndexTest.java | 173 +++++++++++++++++++++
.../collector/CachedSchemaPatternMatcherTest.java | 14 +-
.../core/collector/PipeRealtimeCollectTest.java | 12 +-
41 files changed, 1136 insertions(+), 173 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
index 3455a6c1369..2992f6dd31e 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
@@ -71,7 +71,10 @@ public class PipeRuntimeCoordinator implements
IClusterStatusSubscriber {
.forEach(
(regionId, pair) -> {
if (regionId.getType().equals(TConsensusGroupType.DataRegion)) {
- dataRegionGroupToOldAndNewLeaderPairMap.put(regionId, pair);
+ dataRegionGroupToOldAndNewLeaderPairMap.put(
+ regionId,
+ new Pair<>( // null or -1 means empty origin leader
+ pair.left == null ? -1 : pair.left, pair.right == null
? -1 : pair.right));
}
});
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 76ec9007ffa..2c9485c29fc 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.persistence.pipe;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
@@ -210,7 +211,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
if (newDataRegionLeader != -1) {
consensusGroupIdToTaskMetaMap
.get(dataRegionGroupId)
- .setRegionLeader(newDataRegionLeader);
+ .setLeaderDataNodeId(newDataRegionLeader);
} else {
consensusGroupIdToTaskMetaMap.remove(dataRegionGroupId);
}
@@ -219,9 +220,9 @@ public class PipeTaskInfo implements SnapshotProcessor {
// region group is newly added.
if (newDataRegionLeader != -1) {
consensusGroupIdToTaskMetaMap.put(
- // TODO: the progress index should be passed
from the leader
- // correctly
- dataRegionGroupId, new PipeTaskMeta(0,
newDataRegionLeader));
+ dataRegionGroupId,
+ new PipeTaskMeta(
+ new MinimumProgressIndex(),
newDataRegionLeader));
} else {
LOGGER.warn(
"The pipe task meta does not contain the
data region group {} or the data region group has already been removed",
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index db04d909f7d..b63e157f096 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -117,7 +117,7 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
pipeMetaFromDataNode.getRuntimeMeta().getConsensusGroupIdToTaskMetaMap();
for (final Map.Entry<TConsensusGroupId, PipeTaskMeta>
runtimeMetaOnConfigNode :
pipeTaskMetaMapOnConfigNode.entrySet()) {
- if (runtimeMetaOnConfigNode.getValue().getRegionLeader() !=
dataNodeId) {
+ if (runtimeMetaOnConfigNode.getValue().getLeaderDataNodeId() !=
dataNodeId) {
continue;
}
@@ -132,11 +132,13 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
}
// update progress index
- if (runtimeMetaOnConfigNode.getValue().getProgressIndex()
- < runtimeMetaFromDataNode.getProgressIndex()) {
+ if (!runtimeMetaOnConfigNode
+ .getValue()
+ .getProgressIndex()
+ .isAfter(runtimeMetaFromDataNode.getProgressIndex())) {
runtimeMetaOnConfigNode
.getValue()
- .setProgressIndex(runtimeMetaFromDataNode.getProgressIndex());
+ .updateProgressIndex(runtimeMetaFromDataNode.getProgressIndex());
needWriteConsensusOnConfigNodes = true;
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index e0de835fb07..b7ab5ef62d5 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.procedure.impl.pipe.task;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
@@ -102,9 +103,9 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
.getLoadManager()
.getRegionLeaderMap()
.forEach(
- (region, leader) ->
- // TODO: make index configurable
- consensusGroupIdToTaskMetaMap.put(region, new PipeTaskMeta(0,
leader)));
+ (regionGroup, regionLeaderNodeId) ->
+ consensusGroupIdToTaskMetaMap.put(
+ regionGroup, new PipeTaskMeta(new MinimumProgressIndex(),
regionLeaderNodeId)));
pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
}
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 9a83e00acd7..9c3be2ac3a1 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimedQuota;
import org.apache.iotdb.common.rpc.thrift.ThrottleType;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
@@ -1050,7 +1051,7 @@ public class ConfigPhysicalPlanSerDeTest {
collectorAttributes.put("collector",
"org.apache.iotdb.pipe.collector.DefaultCollector");
processorAttributes.put("processor",
"org.apache.iotdb.pipe.processor.SDTFilterProcessor");
connectorAttributes.put("connector",
"org.apache.iotdb.pipe.protocal.ThriftTransporter");
- PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(0, 1);
+ PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(new MinimumProgressIndex(),
1);
Map<TConsensusGroupId, PipeTaskMeta> pipeTasks = new HashMap<>();
pipeTasks.put(new TConsensusGroupId(DataRegion, 1), pipeTaskMeta);
PipeStaticMeta pipeStaticMeta =
@@ -1163,10 +1164,12 @@ public class ConfigPhysicalPlanSerDeTest {
{
put(
new TConsensusGroupId(TConsensusGroupType.DataRegion, 456),
- new PipeTaskMeta(789, 987));
+ new PipeTaskMeta(
+ new MinimumProgressIndex(), 987)); // TODO: replace
with IoTConsensus
put(
new TConsensusGroupId(TConsensusGroupType.DataRegion, 123),
- new PipeTaskMeta(456, 789));
+ new PipeTaskMeta(
+ new MinimumProgressIndex(), 789)); // TODO: replace
with IoTConsensus
}
});
pipeMetaList.add(new PipeMeta(pipeStaticMeta, pipeRuntimeMeta));
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
index 0246759c98c..51b771026c1 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.persistence;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
@@ -72,7 +73,7 @@ public class PipeInfoTest {
collectorAttributes.put("collector",
"org.apache.iotdb.pipe.collector.DefaultCollector");
processorAttributes.put("processor",
"org.apache.iotdb.pipe.processor.SDTFilterProcessor");
connectorAttributes.put("connector",
"org.apache.iotdb.pipe.protocal.ThriftTransporter");
- PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(0, 1);
+ PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(new MinimumProgressIndex(),
1);
Map<TConsensusGroupId, PipeTaskMeta> pipeTasks = new HashMap<>();
pipeTasks.put(new TConsensusGroupId(DataRegion, 1), pipeTaskMeta);
PipeStaticMeta pipeStaticMeta =
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
index 07d74e2d9ff..38783ea2e53 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
@@ -68,10 +69,12 @@ public class PipeHandleMetaChangeProcedureTest {
{
put(
new TConsensusGroupId(TConsensusGroupType.DataRegion, 456),
- new PipeTaskMeta(789, 987));
+ new PipeTaskMeta(
+ new MinimumProgressIndex(), 987)); // TODO: replace
with IoTConsensus
put(
new TConsensusGroupId(TConsensusGroupType.DataRegion, 123),
- new PipeTaskMeta(456, 789));
+ new PipeTaskMeta(
+ new MinimumProgressIndex(), 789)); // TODO: replace
with IoTConsensus
}
});
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 1cb06ba4d3f..8c8badf4be6 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
+import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.consensus.IStateMachine;
@@ -646,6 +648,11 @@ public class IoTConsensusServerImpl {
public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
IConsensusRequest request) {
+ if (request instanceof ComparableConsensusRequest) {
+ final IoTProgressIndex iotProgressIndex = new IoTProgressIndex();
+ iotProgressIndex.addSearchIndex(thisNode.getNodeId(), searchIndex.get()
+ 1);
+ ((ComparableConsensusRequest)
request).setProgressIndex(iotProgressIndex);
+ }
return new IndexedConsensusRequest(searchIndex.get() + 1,
Collections.singletonList(request));
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ComparableConsensusRequest.java
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ComparableConsensusRequest.java
new file mode 100644
index 00000000000..ff08623df38
--- /dev/null
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ComparableConsensusRequest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index;
+
+public interface ComparableConsensusRequest {
+ ProgressIndex getProgressIndex();
+
+ void setProgressIndex(ProgressIndex progressIndex);
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
new file mode 100644
index 00000000000..b4a1f6aeca7
--- /dev/null
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public interface ProgressIndex {
+
+ /** serialize this progress index to the given byte buffer */
+ void serialize(ByteBuffer byteBuffer);
+
+ /** serialize this progress index to the given output stream */
+ void serialize(OutputStream stream) throws IOException;
+
+ /**
+ * A.isAfter(B) is true if and only if A is strictly greater than B.
+ *
+ * @param progressIndex the progress index to be compared
+ * @return true if and only if this progress index is strictly greater than
the given consensus
+ * index
+ */
+ boolean isAfter(ProgressIndex progressIndex);
+
+ /**
+ * A.equals(B) is true if and only if A is equal to B
+ *
+ * @param progressIndex the progress index to be compared
+ * @return true if and only if this progress index is equal to the given
progress index
+ */
+ boolean equals(ProgressIndex progressIndex);
+
+ /**
+ * A.equals(B) is true if and only if A is equal to B
+ *
+ * @param obj the object to be compared
+ * @return true if and only if this progress index is equal to the given
object
+ */
+ @Override
+ boolean equals(Object obj);
+
+ /**
+ * C = A.updateToMinimumIsAfterProgressIndex(B) where C should satisfy:
+ *
+ * <p>(C.equals(A) || C.isAfter(A)) is true
+ *
+ * <p>(C.equals(B) || C.isAfter(B)) is true
+ *
+ * <p>There is no D, such that D satisfies the above conditions and
C.isAfter(D) is true
+ *
+ * <p>The implementation of this function should be reflexive, that is
+ *
A.updateToMinimumIsAfterProgressIndex(B).equals(B.updateToMinimumIsAfterProgressIndex(A))
is
+ * true
+ *
+ * <p>Note: this function may modify the caller.
+ *
+ * @param progressIndex the progress index to be compared
+ * @return the minimum progress index after the given progress index and
this progress index
+ */
+ ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex
progressIndex);
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
new file mode 100644
index 00000000000..37d83a52efb
--- /dev/null
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index;
+
+import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public enum ProgressIndexType {
+ MINIMUM_CONSENSUS_INDEX((short) 1),
+ IOT_CONSENSUS_INDEX((short) 2),
+ ;
+
+ private final short type;
+
+ ProgressIndexType(short type) {
+ this.type = type;
+ }
+
+ public short getType() {
+ return type;
+ }
+
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(type, byteBuffer);
+ }
+
+ public void serialize(OutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(type, stream);
+ }
+
+ public static ProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
+ final short indexType = ReadWriteIOUtils.readShort(byteBuffer);
+ switch (indexType) {
+ case 1:
+ return MinimumProgressIndex.deserializeFrom(byteBuffer);
+ case 2:
+ return IoTProgressIndex.deserializeFrom(byteBuffer);
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported progress index type %s.", indexType));
+ }
+ }
+
+ public static ProgressIndex deserializeFrom(InputStream stream) throws
IOException {
+ final short indexType = ReadWriteIOUtils.readShort(stream);
+ switch (indexType) {
+ case 1:
+ return MinimumProgressIndex.deserializeFrom(stream);
+ case 2:
+ return IoTProgressIndex.deserializeFrom(stream);
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported progress index type %s.", indexType));
+ }
+ }
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
new file mode 100644
index 00000000000..d84ef20394f
--- /dev/null
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index.impl;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public class IoTProgressIndex implements ProgressIndex {
+
+ private final Map<Integer, Long> peerId2SearchIndex;
+
+ public IoTProgressIndex() {
+ peerId2SearchIndex = new HashMap<>();
+ }
+
+ public void addSearchIndex(Integer peerId, Long searchIndex) {
+ peerId2SearchIndex.put(peerId, searchIndex);
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ProgressIndexType.IOT_CONSENSUS_INDEX.serialize(byteBuffer);
+
+ ReadWriteIOUtils.write(peerId2SearchIndex.size(), byteBuffer);
+ for (final Map.Entry<Integer, Long> entry : peerId2SearchIndex.entrySet())
{
+ ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
+ ReadWriteIOUtils.write(entry.getValue(), byteBuffer);
+ }
+ }
+
+ @Override
+ public void serialize(OutputStream stream) throws IOException {
+ ProgressIndexType.IOT_CONSENSUS_INDEX.serialize(stream);
+
+ ReadWriteIOUtils.write(peerId2SearchIndex.size(), stream);
+ for (final Map.Entry<Integer, Long> entry : peerId2SearchIndex.entrySet())
{
+ ReadWriteIOUtils.write(entry.getKey(), stream);
+ ReadWriteIOUtils.write(entry.getValue(), stream);
+ }
+ }
+
+ @Override
+ public boolean isAfter(ProgressIndex progressIndex) {
+ if (progressIndex instanceof MinimumProgressIndex) {
+ return true;
+ }
+
+ if (!(progressIndex instanceof IoTProgressIndex)) {
+ return false;
+ }
+
+ final IoTProgressIndex thisIoTProgressIndex = this;
+ final IoTProgressIndex thatIoTProgressIndex = (IoTProgressIndex)
progressIndex;
+ return thatIoTProgressIndex.peerId2SearchIndex.entrySet().stream()
+ .noneMatch(
+ entry ->
+
!thisIoTProgressIndex.peerId2SearchIndex.containsKey(entry.getKey())
+ ||
thisIoTProgressIndex.peerId2SearchIndex.get(entry.getKey())
+ <= entry.getValue());
+ }
+
+ @Override
+ public boolean equals(ProgressIndex progressIndex) {
+ if (!(progressIndex instanceof IoTProgressIndex)) {
+ return false;
+ }
+
+ final IoTProgressIndex thisIoTProgressIndex = this;
+ final IoTProgressIndex thatIoTProgressIndex = (IoTProgressIndex)
progressIndex;
+ return thisIoTProgressIndex.peerId2SearchIndex.size()
+ == thatIoTProgressIndex.peerId2SearchIndex.size()
+ && thatIoTProgressIndex.peerId2SearchIndex.entrySet().stream()
+ .allMatch(
+ entry ->
+
thisIoTProgressIndex.peerId2SearchIndex.containsKey(entry.getKey())
+ && thisIoTProgressIndex
+ .peerId2SearchIndex
+ .get(entry.getKey())
+ .equals(entry.getValue()));
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof IoTProgressIndex)) {
+ return false;
+ }
+ return this.equals((IoTProgressIndex) obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex
progressIndex) {
+ if (!(progressIndex instanceof IoTProgressIndex)) {
+ return this;
+ }
+
+ final IoTProgressIndex thisIoTProgressIndex = this;
+ final IoTProgressIndex thatIoTProgressIndex = (IoTProgressIndex)
progressIndex;
+ thatIoTProgressIndex.peerId2SearchIndex.forEach(
+ (thatK, thatV) ->
+ thisIoTProgressIndex.peerId2SearchIndex.compute(
+ thatK, (thisK, thisV) -> (thisV == null ? thatV :
Math.max(thisV, thatV))));
+ return this;
+ }
+
+ public static IoTProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
+ final IoTProgressIndex ioTProgressIndex = new IoTProgressIndex();
+ final int size = ReadWriteIOUtils.readInt(byteBuffer);
+ for (int i = 0; i < size; i++) {
+ final int peerId = ReadWriteIOUtils.readInt(byteBuffer);
+ final long searchIndex = ReadWriteIOUtils.readLong(byteBuffer);
+ ioTProgressIndex.addSearchIndex(peerId, searchIndex);
+ }
+ return ioTProgressIndex;
+ }
+
+ public static IoTProgressIndex deserializeFrom(InputStream stream) throws
IOException {
+ final IoTProgressIndex ioTProgressIndex = new IoTProgressIndex();
+ final int size = ReadWriteIOUtils.readInt(stream);
+ for (int i = 0; i < size; i++) {
+ final int peerId = ReadWriteIOUtils.readInt(stream);
+ final long searchIndex = ReadWriteIOUtils.readLong(stream);
+ ioTProgressIndex.addSearchIndex(peerId, searchIndex);
+ }
+ return ioTProgressIndex;
+ }
+
+ @Override
+ public String toString() {
+ return "IoTProgressIndex{" + "peerId2SearchIndex=" + peerId2SearchIndex +
'}';
+ }
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
new file mode 100644
index 00000000000..e36b990eae4
--- /dev/null
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index.impl;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class MinimumProgressIndex implements ProgressIndex {
+
+ public MinimumProgressIndex() {}
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ProgressIndexType.MINIMUM_CONSENSUS_INDEX.serialize(byteBuffer);
+ }
+
+ @Override
+ public void serialize(OutputStream stream) throws IOException {
+ ProgressIndexType.MINIMUM_CONSENSUS_INDEX.serialize(stream);
+ }
+
+ @Override
+ public boolean isAfter(ProgressIndex progressIndex) {
+ return false;
+ }
+
+ @Override
+ public boolean equals(ProgressIndex progressIndex) {
+ return progressIndex instanceof MinimumProgressIndex;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (this == obj) {
+ return true;
+ }
+ return getClass() == obj.getClass();
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex
progressIndex) {
+ return progressIndex == null ? this : progressIndex;
+ }
+
+ public static MinimumProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
+ return new MinimumProgressIndex();
+ }
+
+ public static MinimumProgressIndex deserializeFrom(InputStream stream) {
+ return new MinimumProgressIndex();
+ }
+
+ @Override
+ public String toString() {
+ return "MinimumProgressIndex{}";
+ }
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
index bb5149fb1f2..d4f43d15475 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.commons.pipe.task.meta;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
import org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
@@ -34,28 +36,33 @@ import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
public class PipeTaskMeta {
- // TODO: replace it with consensus index
- private final AtomicLong progressIndex = new AtomicLong(0L);
- private final AtomicInteger regionLeader = new AtomicInteger(0);
+ private final AtomicReference<ProgressIndex> progressIndex = new
AtomicReference<>();
+ private final AtomicInteger leaderDataNodeId = new AtomicInteger(0);
private final Queue<PipeRuntimeException> exceptionMessages = new
ConcurrentLinkedQueue<>();
- private PipeTaskMeta() {}
-
- public PipeTaskMeta(long progressIndex, int regionLeader) {
+ public PipeTaskMeta(/* @NotNull */ ProgressIndex progressIndex, int
leaderDataNodeId) {
this.progressIndex.set(progressIndex);
- this.regionLeader.set(regionLeader);
+ this.leaderDataNodeId.set(leaderDataNodeId);
}
- public long getProgressIndex() {
+ public ProgressIndex getProgressIndex() {
return progressIndex.get();
}
- public int getRegionLeader() {
- return regionLeader.get();
+ public void updateProgressIndex(ProgressIndex updateIndex) {
+ progressIndex.updateAndGet(index ->
index.updateToMinimumIsAfterProgressIndex(updateIndex));
+ }
+
+ public int getLeaderDataNodeId() {
+ return leaderDataNodeId.get();
+ }
+
+ public void setLeaderDataNodeId(int leaderDataNodeId) {
+ this.leaderDataNodeId.set(leaderDataNodeId);
}
public Iterable<PipeRuntimeException> getExceptionMessages() {
@@ -70,17 +77,9 @@ public class PipeTaskMeta {
exceptionMessages.clear();
}
- public void setProgressIndex(long progressIndex) {
- this.progressIndex.set(progressIndex);
- }
-
- public void setRegionLeader(int regionLeader) {
- this.regionLeader.set(regionLeader);
- }
-
public void serialize(DataOutputStream outputStream) throws IOException {
- ReadWriteIOUtils.write(progressIndex.get(), outputStream);
- ReadWriteIOUtils.write(regionLeader.get(), outputStream);
+ progressIndex.get().serialize(outputStream);
+ ReadWriteIOUtils.write(leaderDataNodeId.get(), outputStream);
ReadWriteIOUtils.write(exceptionMessages.size(), outputStream);
for (final PipeRuntimeException exceptionMessage : exceptionMessages) {
ReadWriteIOUtils.write(
@@ -90,8 +89,8 @@ public class PipeTaskMeta {
}
public void serialize(FileOutputStream outputStream) throws IOException {
- ReadWriteIOUtils.write(progressIndex.get(), outputStream);
- ReadWriteIOUtils.write(regionLeader.get(), outputStream);
+ progressIndex.get().serialize(outputStream);
+ ReadWriteIOUtils.write(leaderDataNodeId.get(), outputStream);
ReadWriteIOUtils.write(exceptionMessages.size(), outputStream);
for (final PipeRuntimeException exceptionMessage : exceptionMessages) {
ReadWriteIOUtils.write(
@@ -101,9 +100,9 @@ public class PipeTaskMeta {
}
public static PipeTaskMeta deserialize(ByteBuffer byteBuffer) {
- final PipeTaskMeta PipeTaskMeta = new PipeTaskMeta();
- PipeTaskMeta.progressIndex.set(ReadWriteIOUtils.readLong(byteBuffer));
- PipeTaskMeta.regionLeader.set(ReadWriteIOUtils.readInt(byteBuffer));
+ final ProgressIndex progressIndex =
ProgressIndexType.deserializeFrom(byteBuffer);
+ final int leaderDataNodeId = ReadWriteIOUtils.readInt(byteBuffer);
+ final PipeTaskMeta PipeTaskMeta = new PipeTaskMeta(progressIndex,
leaderDataNodeId);
final int size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
final boolean critical = ReadWriteIOUtils.readBool(byteBuffer);
@@ -117,9 +116,9 @@ public class PipeTaskMeta {
}
public static PipeTaskMeta deserialize(InputStream inputStream) throws
IOException {
- final PipeTaskMeta PipeTaskMeta = new PipeTaskMeta();
- PipeTaskMeta.progressIndex.set(ReadWriteIOUtils.readLong(inputStream));
- PipeTaskMeta.regionLeader.set(ReadWriteIOUtils.readInt(inputStream));
+ final ProgressIndex progressIndex =
ProgressIndexType.deserializeFrom(inputStream);
+ final int leaderDataNodeId = ReadWriteIOUtils.readInt(inputStream);
+ final PipeTaskMeta PipeTaskMeta = new PipeTaskMeta(progressIndex,
leaderDataNodeId);
final int size = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < size; ++i) {
final boolean critical = ReadWriteIOUtils.readBool(inputStream);
@@ -141,14 +140,14 @@ public class PipeTaskMeta {
return false;
}
PipeTaskMeta that = (PipeTaskMeta) obj;
- return progressIndex.get() == that.progressIndex.get()
- && regionLeader.get() == that.regionLeader.get()
+ return progressIndex.get().equals(that.progressIndex.get())
+ && leaderDataNodeId.get() == that.leaderDataNodeId.get()
&& Arrays.equals(exceptionMessages.toArray(),
that.exceptionMessages.toArray());
}
@Override
public int hashCode() {
- return Objects.hash(progressIndex, regionLeader, exceptionMessages);
+ return Objects.hash(progressIndex, leaderDataNodeId, exceptionMessages);
}
@Override
@@ -157,8 +156,8 @@ public class PipeTaskMeta {
+ "progressIndex='"
+ progressIndex
+ '\''
- + ", regionLeader='"
- + regionLeader
+ + ", leaderDataNodeId='"
+ + leaderDataNodeId
+ '\''
+ ", exceptionMessages="
+ exceptionMessages
diff --git
a/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaDeSerTest.java
b/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaDeSerTest.java
index 08e7f86007c..b9fc0ef7e5d 100644
---
a/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaDeSerTest.java
+++
b/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaDeSerTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.task.meta;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.junit.Assert;
import org.junit.Test;
@@ -59,10 +60,12 @@ public class PipeMetaDeSerTest {
{
put(
new TConsensusGroupId(TConsensusGroupType.DataRegion, 456),
- new PipeTaskMeta(789, 987));
+ new PipeTaskMeta(
+ new MinimumProgressIndex(), 987)); // TODO: replace
with IoTProgressIndex;
put(
new TConsensusGroupId(TConsensusGroupType.DataRegion, 123),
- new PipeTaskMeta(456, 789));
+ new PipeTaskMeta(
+ new MinimumProgressIndex(), 789)); // TODO: replace
with IoTProgressIndex;
}
});
ByteBuffer runtimeByteBuffer = pipeRuntimeMeta.serialize();
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
index 26d52eed22f..8fa8cb7c12b 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.db.consensus.statemachine;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
+import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import
org.apache.iotdb.consensus.common.request.DeserializedBatchIndexedConsensusRequest;
@@ -84,7 +86,14 @@ public class IoTConsensusDataRegionStateMachine extends
DataRegionStateMachine {
batchRequest.getEndSyncIndex(),
batchRequest.getRequests().size());
for (IndexedConsensusRequest indexedRequest :
batchRequest.getRequests()) {
- deserializedRequest.add(grabInsertNode(indexedRequest));
+ final PlanNode planNode = grabInsertNode(indexedRequest);
+ if (planNode instanceof ComparableConsensusRequest) {
+ final IoTProgressIndex ioTProgressIndex = new IoTProgressIndex();
+ ioTProgressIndex.addSearchIndex(
+ Integer.parseInt(batchRequest.getSourcePeerId()),
indexedRequest.getSearchIndex());
+ ((ComparableConsensusRequest)
planNode).setProgressIndex(ioTProgressIndex);
+ }
+ deserializedRequest.add(planNode);
}
result = deserializedRequest;
} else {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index f89f28f9b2f..a124a8aa2c8 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -301,6 +301,9 @@ public class TsFileProcessor {
tsFileResource.updateEndTime(
insertRowNode.getDeviceID().toStringID(), insertRowNode.getTime());
}
+
+ tsFileResource.updateProgressIndex(insertRowNode.getProgressIndex());
+
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(System.nanoTime()
- startTime);
}
@@ -413,6 +416,9 @@ public class TsFileProcessor {
tsFileResource.updateEndTime(
insertTabletNode.getDeviceID().toStringID(),
insertTabletNode.getTimes()[end - 1]);
}
+
+ tsFileResource.updateProgressIndex(insertTabletNode.getProgressIndex());
+
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(System.nanoTime()
- startTime);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 0e5949f41b4..774106dd593 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -18,6 +18,9 @@
*/
package org.apache.iotdb.db.engine.storagegroup;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -153,6 +156,8 @@ public class TsFileResource {
*/
private TsFileResource originTsFileResource;
+ private ProgressIndex maxProgressIndex;
+
public TsFileResource() {}
public TsFileResource(TsFileResource other) throws IOException {
@@ -170,6 +175,7 @@ public class TsFileResource {
this.minPlanIndex = other.minPlanIndex;
this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName());
this.tsFileSize = other.tsFileSize;
+ this.maxProgressIndex = other.maxProgressIndex;
}
/** for sealed TsFile, call setClosed to close TsFileResource */
@@ -242,8 +248,27 @@ public class TsFileResource {
if (modFile != null && modFile.exists()) {
String modFileName = new File(modFile.getFilePath()).getName();
ReadWriteIOUtils.write(modFileName, outputStream);
+ } else {
+ // make the first "inputStream.available() > 0" in deserialize() happy.
+ //
+ // if modFile not exist, write null (-1). the first
"inputStream.available() > 0" in
+ // deserialize() and deserializeFromOldFile() detect -1 and
deserialize modFileName as null
+ // and skip the modFile deserialize.
+ //
+ // this make sure the first and the second "inputStream.available() >
0" in deserialize()
+ // will always be called... which is a bit ugly but allows the
following variable
+ // maxProgressIndex to be deserialized correctly.
+ ReadWriteIOUtils.write((String) null, outputStream);
+ }
+
+ if (maxProgressIndex != null) {
+ ReadWriteIOUtils.write(true, outputStream);
+ maxProgressIndex.serialize(outputStream);
+ } else {
+ ReadWriteIOUtils.write(false, outputStream);
}
}
+
File src = fsFactory.getFile(file + RESOURCE_SUFFIX + TEMP_SUFFIX);
File dest = fsFactory.getFile(file + RESOURCE_SUFFIX);
fsFactory.deleteIfExists(dest);
@@ -258,6 +283,7 @@ public class TsFileResource {
timeIndex = ITimeIndex.createTimeIndex(inputStream);
maxPlanIndex = ReadWriteIOUtils.readLong(inputStream);
minPlanIndex = ReadWriteIOUtils.readLong(inputStream);
+
if (inputStream.available() > 0) {
String modFileName = ReadWriteIOUtils.readString(inputStream);
if (modFileName != null) {
@@ -265,6 +291,13 @@ public class TsFileResource {
modFile = new ModificationFile(modF.getPath());
}
}
+
+ if (inputStream.available() > 0) {
+ final boolean hasMaxProgressIndex =
ReadWriteIOUtils.readBool(inputStream);
+ if (hasMaxProgressIndex) {
+ maxProgressIndex = ProgressIndexType.deserializeFrom(inputStream);
+ }
+ }
}
// upgrade from v0.12 to v0.13, we need to rewrite the TsFileResource if
the previous time index
@@ -310,6 +343,12 @@ public class TsFileResource {
modFile = new ModificationFile(modF.getPath());
}
}
+ if (inputStream.available() > 0) {
+ final boolean hasMaxProgressIndex =
ReadWriteIOUtils.readBool(inputStream);
+ if (hasMaxProgressIndex) {
+ maxProgressIndex = ProgressIndexType.deserializeFrom(inputStream);
+ }
+ }
}
}
@@ -1122,4 +1161,23 @@ public class TsFileResource {
public boolean isFileInList() {
return prev != null || next != null;
}
+
+ public void updateProgressIndex(ProgressIndex progressIndex) {
+ if (progressIndex == null) {
+ return;
+ }
+
+ maxProgressIndex =
+ (maxProgressIndex == null
+ ? progressIndex
+ :
maxProgressIndex.updateToMinimumIsAfterProgressIndex(progressIndex));
+ }
+
+ public ProgressIndex getMaxProgressIndexAfterClose() throws
IllegalStateException {
+ if (status.equals(TsFileResourceStatus.UNCLOSED)) {
+ throw new IllegalStateException(
+ "Should not get progress index from a unclosing TsFileResource.");
+ }
+ return maxProgressIndex == null ? new MinimumProgressIndex() :
maxProgressIndex;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 5a4e198b77c..73308092ecb 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -263,4 +264,10 @@ public class InsertMultiTabletsNode extends InsertNode {
public long getMinTime() {
throw new NotImplementedException();
}
+
+ @Override
+ public void setProgressIndex(ProgressIndex progressIndex) {
+ this.progressIndex = progressIndex;
+ insertTabletNodeList.forEach(node -> node.setProgressIndex(progressIndex));
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index c191f03837a..df0a01c6271 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
@@ -40,7 +42,7 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
-public abstract class InsertNode extends WritePlanNode {
+public abstract class InsertNode extends WritePlanNode implements
ComparableConsensusRequest {
/** this insert node doesn't need to participate in iot consensus */
public static final long NO_CONSENSUS_INDEX =
ConsensusReqReader.DEFAULT_SEARCH_INDEX;
@@ -73,6 +75,8 @@ public abstract class InsertNode extends WritePlanNode {
/** Physical address of data region after splitting */
protected TRegionReplicaSet dataRegionReplicaSet;
+ protected ProgressIndex progressIndex;
+
protected InsertNode(PlanNodeId id) {
super(id);
}
@@ -267,6 +271,20 @@ public abstract class InsertNode extends WritePlanNode {
}
// endregion
+ // region progress index
+
+ @Override
+ public final ProgressIndex getProgressIndex() {
+ return progressIndex;
+ }
+
+ @Override
+ public void setProgressIndex(ProgressIndex progressIndex) {
+ this.progressIndex = progressIndex;
+ }
+
+ // endregion
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
index 050839f6428..5fcaf1b44ac 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -240,4 +241,10 @@ public class InsertRowsNode extends InsertNode {
public long getMinTime() {
throw new NotImplementedException();
}
+
+ @Override
+ public void setProgressIndex(ProgressIndex progressIndex) {
+ this.progressIndex = progressIndex;
+ insertRowNodeList.forEach(insertRowNode ->
insertRowNode.setProgressIndex(progressIndex));
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index dd788ea4db0..5cdbf43b23c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.StatusUtils;
@@ -292,4 +293,10 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
public long getMinTime() {
throw new NotImplementedException();
}
+
+ @Override
+ public void setProgressIndex(ProgressIndex progressIndex) {
+ this.progressIndex = progressIndex;
+ insertRowNodeList.forEach(insertRowNode ->
insertRowNode.setProgressIndex(progressIndex));
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 2ff33b450c7..10e8679ac83 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -26,6 +26,8 @@ import
org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.task.PipeBuilder;
import org.apache.iotdb.db.pipe.task.PipeTask;
@@ -65,6 +67,7 @@ import java.util.stream.Collectors;
public class PipeTaskAgent {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTaskAgent.class);
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
private final PipeMetaKeeper pipeMetaKeeper;
private final PipeTaskManager pipeTaskManager;
@@ -76,7 +79,6 @@ public class PipeTaskAgent {
////////////////////////// Pipe Task Management Entry
//////////////////////////
- // TODO: handle progress index
public synchronized void handlePipeMetaChanges(List<PipeMeta>
pipeMetaListFromConfigNode) {
// do nothing if data node is removing or removed
if (PipeAgent.runtime().isShutdown()) {
@@ -156,11 +158,7 @@ public class PipeTaskAgent {
// if task meta does not exist on data node, create a new task
if (taskMetaOnDataNode == null) {
- createPipeTask(
- consensusGroupIdFromConfigNode,
- pipeStaticMeta,
- taskMetaFromConfigNode.getProgressIndex(),
- taskMetaFromConfigNode.getRegionLeader());
+ createPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta,
taskMetaFromConfigNode);
// we keep the new created task's status consistent with the status
recorded in data node's
// pipe runtime meta. please note that the status recorded in data
node's pipe runtime meta
// is not reliable, but we will have a check later to make sure the
status is correct.
@@ -171,16 +169,12 @@ public class PipeTaskAgent {
}
// if task meta exists on data node, check if it has changed
- final int regionLeaderFromConfigNode =
taskMetaFromConfigNode.getRegionLeader();
- final int regionLeaderOnDataNode = taskMetaOnDataNode.getRegionLeader();
+ final int dataNodeIdFromConfigNode =
taskMetaFromConfigNode.getLeaderDataNodeId();
+ final int dataNodeIdOnDataNode =
taskMetaOnDataNode.getLeaderDataNodeId();
- if (regionLeaderFromConfigNode != regionLeaderOnDataNode) {
+ if (dataNodeIdFromConfigNode != dataNodeIdOnDataNode) {
dropPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
- createPipeTask(
- consensusGroupIdFromConfigNode,
- pipeStaticMeta,
- taskMetaFromConfigNode.getProgressIndex(),
- taskMetaFromConfigNode.getRegionLeader());
+ createPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta,
taskMetaFromConfigNode);
// we keep the new created task's status consistent with the status
recorded in data node's
// pipe runtime meta. please note that the status recorded in data
node's pipe runtime meta
// is not reliable, but we will have a check later to make sure the
status is correct.
@@ -511,19 +505,22 @@ public class PipeTaskAgent {
///////////////////////// Manage by dataRegionGroupId
/////////////////////////
private void createPipeTask(
- TConsensusGroupId dataRegionGroupId,
+ TConsensusGroupId consensusGroupId,
PipeStaticMeta pipeStaticMeta,
- long progressIndex,
- int dataRegionId) {
- final PipeTask pipeTask =
- new PipeTaskBuilder(Integer.toString(dataRegionId),
pipeStaticMeta).build();
- pipeTask.create();
- pipeTaskManager.addPipeTask(pipeStaticMeta, dataRegionGroupId, pipeTask);
+ PipeTaskMeta pipeTaskMeta) {
+ if (pipeTaskMeta.getLeaderDataNodeId() == CONFIG.getDataNodeId()) {
+ final PipeTask pipeTask =
+ new PipeTaskBuilder(consensusGroupId, pipeTaskMeta,
pipeStaticMeta).build();
+ pipeTask.create();
+ pipeTaskManager.addPipeTask(pipeStaticMeta, consensusGroupId, pipeTask);
+ }
pipeMetaKeeper
.getPipeMeta(pipeStaticMeta.getPipeName())
.getRuntimeMeta()
.getConsensusGroupIdToTaskMetaMap()
- .put(dataRegionGroupId, new PipeTaskMeta(progressIndex, dataRegionId));
+ .put(
+ consensusGroupId,
+ new PipeTaskMeta(pipeTaskMeta.getProgressIndex(),
pipeTaskMeta.getLeaderDataNodeId()));
}
private void dropPipeTask(TConsensusGroupId dataRegionGroupId,
PipeStaticMeta pipeStaticMeta) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
index fe1c92638be..fa7b07c182b 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.core.collector;
import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
import
org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionTsFileCollector;
@@ -44,10 +45,12 @@ public class IoTDBDataRegionCollector implements
PipeCollector {
private int dataRegionId;
- public IoTDBDataRegionCollector(ListenableUnblockingPendingQueue<Event>
collectorPendingQueue) {
+ public IoTDBDataRegionCollector(
+ PipeTaskMeta pipeTaskMeta, ListenableUnblockingPendingQueue<Event>
collectorPendingQueue) {
hasBeenStarted = new AtomicBoolean(false);
- realtimeCollector = new
PipeRealtimeDataRegionHybridCollector(collectorPendingQueue);
- historicalCollector = new PipeHistoricalDataRegionTsFileCollector();
+ realtimeCollector =
+ new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta,
collectorPendingQueue);
+ historicalCollector = new
PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index ed0d37c6c14..9e796bddc77 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.db.pipe.core.collector.historical;
import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
@@ -37,10 +39,18 @@ import java.util.stream.Collectors;
public class PipeHistoricalDataRegionTsFileCollector implements PipeCollector {
+ private final PipeTaskMeta pipeTaskMeta;
+ private final ProgressIndex startIndex;
+
private int dataRegionId;
private Queue<PipeTsFileInsertionEvent> pendingQueue;
+ public PipeHistoricalDataRegionTsFileCollector(PipeTaskMeta pipeTaskMeta) {
+ this.pipeTaskMeta = pipeTaskMeta;
+ this.startIndex = pipeTaskMeta.getProgressIndex();
+ }
+
@Override
public void validate(PipeParameterValidator validator) throws Exception {
validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
@@ -71,16 +81,18 @@ public class PipeHistoricalDataRegionTsFileCollector
implements PipeCollector {
pendingQueue = new ArrayDeque<>(tsFileManager.size(true) +
tsFileManager.size(false));
pendingQueue.addAll(
tsFileManager.getTsFileList(true).stream()
- .map(PipeTsFileInsertionEvent::new)
+ .filter(resource ->
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose()))
+ .map(resource -> new PipeTsFileInsertionEvent(resource,
pipeTaskMeta))
.collect(Collectors.toList()));
pendingQueue.addAll(
tsFileManager.getTsFileList(false).stream()
- .map(PipeTsFileInsertionEvent::new)
+ .filter(resource ->
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose()))
+ .map(resource -> new PipeTsFileInsertionEvent(resource,
pipeTaskMeta))
.collect(Collectors.toList()));
pendingQueue.forEach(
- event ->
- event.increaseReferenceCount(
- PipeHistoricalDataRegionTsFileCollector.class.getName()));
+ event -> {
+
event.increaseReferenceCount(PipeHistoricalDataRegionTsFileCollector.class.getName());
+ });
} finally {
tsFileManager.readUnlock();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
index 41e9f5d76fb..fd790b718cd 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.core.collector.realtime;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
import
org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
@@ -29,9 +30,15 @@ import
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfig
public abstract class PipeRealtimeDataRegionCollector implements PipeCollector
{
+ protected final PipeTaskMeta pipeTaskMeta;
+
protected String pattern;
protected String dataRegionId;
+ public PipeRealtimeDataRegionCollector(PipeTaskMeta pipeTaskMeta) {
+ this.pipeTaskMeta = pipeTaskMeta;
+ }
+
@Override
public void validate(PipeParameterValidator validator) throws Exception {
validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
@@ -64,6 +71,10 @@ public abstract class PipeRealtimeDataRegionCollector
implements PipeCollector {
return pattern;
}
+ public final PipeTaskMeta getPipeTaskMeta() {
+ return pipeTaskMeta;
+ }
+
@Override
public String toString() {
return "PipeRealtimeDataRegionCollector{"
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 898b93cb22d..4ec7ed92261 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.core.collector.realtime;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
@@ -44,7 +45,8 @@ public class PipeRealtimeDataRegionHybridCollector extends
PipeRealtimeDataRegio
private final ListenableUnblockingPendingQueue<Event> pendingQueue;
public PipeRealtimeDataRegionHybridCollector(
- ListenableUnblockingPendingQueue<Event> pendingQueue) {
+ PipeTaskMeta pipeTaskMeta, ListenableUnblockingPendingQueue<Event>
pendingQueue) {
+ super(pipeTaskMeta);
this.pendingQueue = pendingQueue;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
index 2d805310f30..3e264ce02e9 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
@@ -53,8 +53,11 @@ public class PipeDataRegionAssigner {
.match(event)
.forEach(
collector -> {
-
event.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
- collector.collect(event);
+ final PipeRealtimeCollectEvent copiedEvent =
+ event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+ collector.getPipeTaskMeta());
+
copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
+ collector.collect(copiedEvent);
});
event.gcSchemaInfo();
event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName());
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
index fa9e765ca25..fc15200b603 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
@@ -19,20 +19,73 @@
package org.apache.iotdb.db.pipe.core.event;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* EnrichedEvent is an event that can be enriched with additional runtime
information. The
* additional information mainly includes the reference count of the event.
*/
-public interface EnrichedEvent {
+public abstract class EnrichedEvent implements Event {
+
+ private final AtomicInteger referenceCount;
+
+ private final PipeTaskMeta pipeTaskMeta;
+
+ public EnrichedEvent(PipeTaskMeta pipeTaskMeta) {
+ referenceCount = new AtomicInteger(0);
+ this.pipeTaskMeta = pipeTaskMeta;
+ }
+
+ /**
+ * increase the reference count of this event. when the reference count is
positive, the data in
+ * the resource of this event should be safe to use.
+ *
+ * @param holderMessage the message of the invoker
+ * @return true if the reference count is increased successfully, false if
the event is not
+ */
+ public final boolean increaseReferenceCount(String holderMessage) {
+ boolean isSuccessful = true;
+ synchronized (this) {
+ if (referenceCount.get() == 0) {
+ isSuccessful = increaseResourceReferenceCount(holderMessage);
+ }
+ referenceCount.incrementAndGet();
+ }
+ return isSuccessful;
+ }
/**
- * Increase the reference count of this event.
+ * Increase the reference count of the resource of this event.
*
* @param holderMessage the message of the invoker
* @return true if the reference count is increased successfully, false if
the event is not
* controlled by the invoker, which means the data stored in the event
is not safe to use
*/
- boolean increaseReferenceCount(String holderMessage);
+ public abstract boolean increaseResourceReferenceCount(String holderMessage);
+
+ /**
+ * Decrease the reference count of this event. If the reference count is
decreased to 0, the event
+ * can be recycled and the data stored in the event is not safe to use, the
processing progress of
+ * the event should be reported to the pipe task meta.
+ *
+ * @param holderMessage the message of the invoker
+ * @return true if the reference count is decreased successfully, false
otherwise
+ */
+ public final boolean decreaseReferenceCount(String holderMessage) {
+ boolean isSuccessful = true;
+ synchronized (this) {
+ if (referenceCount.get() == 1) {
+ isSuccessful = decreaseResourceReferenceCount(holderMessage);
+ reportProgress();
+ }
+ referenceCount.decrementAndGet();
+ }
+ return isSuccessful;
+ }
/**
* Decrease the reference count of this event. If the reference count is
decreased to 0, the event
@@ -41,14 +94,25 @@ public interface EnrichedEvent {
* @param holderMessage the message of the invoker
* @return true if the reference count is decreased successfully, false
otherwise
*/
- boolean decreaseReferenceCount(String holderMessage);
+ public abstract boolean decreaseResourceReferenceCount(String holderMessage);
+
+ private void reportProgress() {
+ if (pipeTaskMeta != null) {
+ pipeTaskMeta.updateProgressIndex(getProgressIndex());
+ }
+ }
+
+ public abstract ProgressIndex getProgressIndex();
/**
* Get the reference count of this event.
*
* @return the reference count
*/
- int getReferenceCount();
+ public int getReferenceCount() {
+ return referenceCount.get();
+ }
- // TODO: ConsensusIndex getConsensusIndex();
+ public abstract EnrichedEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+ PipeTaskMeta pipeTaskMeta);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
index 99d9da055ee..fb9788d1a95 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.pipe.core.event.impl;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
@@ -35,37 +37,32 @@ import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.function.BiConsumer;
-public class PipeTabletInsertionEvent implements TabletInsertionEvent,
EnrichedEvent {
+public class PipeTabletInsertionEvent extends EnrichedEvent implements
TabletInsertionEvent {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTabletInsertionEvent.class);
private final WALEntryHandler walEntryHandler;
+ private final ProgressIndex progressIndex;
- public PipeTabletInsertionEvent(WALEntryHandler walEntryHandler) {
+ public PipeTabletInsertionEvent(WALEntryHandler walEntryHandler,
ProgressIndex progressIndex) {
+ this(walEntryHandler, progressIndex, null);
+ }
+
+ private PipeTabletInsertionEvent(
+ WALEntryHandler walEntryHandler, ProgressIndex progressIndex,
PipeTaskMeta pipeTaskMeta) {
+ super(pipeTaskMeta);
this.walEntryHandler = walEntryHandler;
+ this.progressIndex = progressIndex;
}
public InsertNode getInsertNode() throws WALPipeException {
return walEntryHandler.getValue();
}
- @Override
- public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector>
consumer) {
- throw new UnsupportedOperationException("Not implemented yet");
- }
-
- @Override
- public TabletInsertionEvent processByIterator(BiConsumer<Iterator<Row>,
RowCollector> consumer) {
- throw new UnsupportedOperationException("Not implemented yet");
- }
-
- @Override
- public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector>
consumer) {
- throw new UnsupportedOperationException("Not implemented yet");
- }
+ /////////////////////////// EnrichedEvent ///////////////////////////
@Override
- public boolean increaseReferenceCount(String holderMessage) {
+ public boolean increaseResourceReferenceCount(String holderMessage) {
try {
PipeResourceManager.wal().pin(walEntryHandler.getMemTableId(),
walEntryHandler);
return true;
@@ -80,7 +77,7 @@ public class PipeTabletInsertionEvent implements
TabletInsertionEvent, EnrichedE
}
@Override
- public boolean decreaseReferenceCount(String holderMessage) {
+ public boolean decreaseResourceReferenceCount(String holderMessage) {
try {
PipeResourceManager.wal().unpin(walEntryHandler.getMemTableId());
return true;
@@ -95,12 +92,42 @@ public class PipeTabletInsertionEvent implements
TabletInsertionEvent, EnrichedE
}
@Override
- public int getReferenceCount() {
- return
PipeResourceManager.wal().getReferenceCount(walEntryHandler.getMemTableId());
+ public ProgressIndex getProgressIndex() {
+ return progressIndex;
+ }
+
+ @Override
+ public PipeTabletInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+ PipeTaskMeta pipeTaskMeta) {
+ return new PipeTabletInsertionEvent(walEntryHandler, progressIndex,
pipeTaskMeta);
+ }
+
+ /////////////////////////// TabletInsertionEvent ///////////////////////////
+
+ @Override
+ public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector>
consumer) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ @Override
+ public TabletInsertionEvent processByIterator(BiConsumer<Iterator<Row>,
RowCollector> consumer) {
+ throw new UnsupportedOperationException("Not implemented yet");
}
+ @Override
+ public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector>
consumer) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ /////////////////////////// Object ///////////////////////////
+
@Override
public String toString() {
- return "PipeTabletInsertionEvent{" + "walEntryHandler=" + walEntryHandler
+ '}';
+ return "PipeTabletInsertionEvent{"
+ + "walEntryHandler="
+ + walEntryHandler
+ + ", progressIndex="
+ + progressIndex
+ + '}';
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
index 65f78cebac2..4b346b0f915 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.db.pipe.core.event.impl;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
@@ -32,14 +35,23 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.concurrent.atomic.AtomicBoolean;
-public class PipeTsFileInsertionEvent implements TsFileInsertionEvent,
EnrichedEvent {
+public class PipeTsFileInsertionEvent extends EnrichedEvent implements
TsFileInsertionEvent {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTsFileInsertionEvent.class);
+ private final TsFileResource resource;
private File tsFile;
+
private final AtomicBoolean isClosed;
public PipeTsFileInsertionEvent(TsFileResource resource) {
+ this(resource, null);
+ }
+
+ public PipeTsFileInsertionEvent(TsFileResource resource, PipeTaskMeta
pipeTaskMeta) {
+ super(pipeTaskMeta);
+
+ this.resource = resource;
tsFile = resource.getTsFile();
isClosed = new AtomicBoolean(resource.isClosed());
@@ -72,20 +84,11 @@ public class PipeTsFileInsertionEvent implements
TsFileInsertionEvent, EnrichedE
return tsFile;
}
- @Override
- public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
- throw new UnsupportedOperationException("Not implemented yet");
- }
-
- @Override
- public TsFileInsertionEvent
toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable) {
- throw new UnsupportedOperationException("Not implemented yet");
- }
+ /////////////////////////// EnrichedEvent ///////////////////////////
@Override
- public boolean increaseReferenceCount(String holderMessage) {
+ public boolean increaseResourceReferenceCount(String holderMessage) {
try {
- // TODO: increase reference count for mods & resource files
tsFile = PipeResourceManager.file().increaseFileReference(tsFile, true);
return true;
} catch (Exception e) {
@@ -99,7 +102,7 @@ public class PipeTsFileInsertionEvent implements
TsFileInsertionEvent, EnrichedE
}
@Override
- public boolean decreaseReferenceCount(String holderMessage) {
+ public boolean decreaseResourceReferenceCount(String holderMessage) {
try {
PipeResourceManager.file().decreaseFileReference(tsFile);
return true;
@@ -114,12 +117,48 @@ public class PipeTsFileInsertionEvent implements
TsFileInsertionEvent, EnrichedE
}
@Override
- public int getReferenceCount() {
- return PipeResourceManager.file().getFileReferenceCount(tsFile);
+ public ProgressIndex getProgressIndex() {
+ try {
+ waitForTsFileClose();
+ return resource.getMaxProgressIndexAfterClose();
+ } catch (InterruptedException e) {
+ LOGGER.warn(
+ String.format(
+ "Interrupted when waiting for closing TsFile %s.",
resource.getTsFilePath()));
+ Thread.currentThread().interrupt();
+ return new MinimumProgressIndex();
+ }
+ }
+
+ @Override
+ public PipeTsFileInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+ PipeTaskMeta pipeTaskMeta) {
+ return new PipeTsFileInsertionEvent(resource, pipeTaskMeta);
+ }
+
+ /////////////////////////// TsFileInsertionEvent ///////////////////////////
+
+ @Override
+ public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ @Override
+ public TsFileInsertionEvent
toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable) {
+ throw new UnsupportedOperationException("Not implemented yet");
}
+ /////////////////////////// Object ///////////////////////////
+
@Override
public String toString() {
- return "PipeTsFileInsertionEvent{" + "tsFile=" + tsFile + '}';
+ return "PipeTsFileInsertionEvent{"
+ + "resource="
+ + resource
+ + ", tsFile="
+ + tsFile
+ + ", isClosed="
+ + isClosed
+ + '}';
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
index 0c63131e6e9..e9887cc88bf 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
@@ -19,20 +19,31 @@
package org.apache.iotdb.db.pipe.core.event.realtime;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
import org.apache.iotdb.pipe.api.event.Event;
import java.util.Map;
-public class PipeRealtimeCollectEvent implements Event, EnrichedEvent {
+/**
+ * PipeRealtimeCollectEvent is an event that decorates the EnrichedEvent with
the information of
+ * TsFileEpoch and schema info. It only exists in the realtime event collector.
+ */
+public class PipeRealtimeCollectEvent extends EnrichedEvent {
- private final Event event;
+ private final EnrichedEvent event;
private final TsFileEpoch tsFileEpoch;
private Map<String, String[]> device2Measurements;
public PipeRealtimeCollectEvent(
- Event event, TsFileEpoch tsFileEpoch, Map<String, String[]>
device2Measurements) {
+ EnrichedEvent event, TsFileEpoch tsFileEpoch, Map<String, String[]>
device2Measurements) {
+ // pipeTaskMeta is used to report the progress of the event, the
PipeRealtimeCollectEvent
+ // is only used in the realtime event collector, which does not need to
report the progress
+ // of the event, so the pipeTaskMeta is always null.
+ super(null);
+
this.event = event;
this.tsFileEpoch = tsFileEpoch;
this.device2Measurements = device2Measurements;
@@ -55,24 +66,38 @@ public class PipeRealtimeCollectEvent implements Event,
EnrichedEvent {
}
@Override
- public boolean increaseReferenceCount(String holderMessage) {
- return !(event instanceof EnrichedEvent)
- || ((EnrichedEvent) event).increaseReferenceCount(holderMessage);
+ public boolean increaseResourceReferenceCount(String holderMessage) {
+ return event.increaseResourceReferenceCount(holderMessage);
+ }
+
+ @Override
+ public boolean decreaseResourceReferenceCount(String holderMessage) {
+ return event.decreaseResourceReferenceCount(holderMessage);
}
@Override
- public boolean decreaseReferenceCount(String holderMessage) {
- return !(event instanceof EnrichedEvent)
- || ((EnrichedEvent) event).decreaseReferenceCount(holderMessage);
+ public ProgressIndex getProgressIndex() {
+ return event.getProgressIndex();
}
@Override
- public int getReferenceCount() {
- return event instanceof EnrichedEvent ? ((EnrichedEvent)
event).getReferenceCount() : 0;
+ public PipeRealtimeCollectEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+ PipeTaskMeta pipeTaskMeta) {
+ return new PipeRealtimeCollectEvent(
+
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(pipeTaskMeta),
+ this.tsFileEpoch,
+ this.device2Measurements);
}
@Override
public String toString() {
- return "PipeRealtimeCollectEvent{" + "event=" + event + ", tsFileEpoch=" +
tsFileEpoch + '}';
+ return "PipeRealtimeCollectEvent{"
+ + "event="
+ + event
+ + ", tsFileEpoch="
+ + tsFileEpoch
+ + ", device2Measurements="
+ + device2Measurements
+ + '}';
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
index a4b453e4322..937e3073a27 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
@@ -37,7 +37,9 @@ public class PipeRealtimeCollectEventFactory {
public static PipeRealtimeCollectEvent createCollectEvent(
WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource
resource) {
return TS_FILE_EPOCH_MANAGER.bindPipeTabletInsertionEvent(
- new PipeTabletInsertionEvent(walEntryHandler), insertNode, resource);
+ new PipeTabletInsertionEvent(walEntryHandler,
insertNode.getProgressIndex()),
+ insertNode,
+ resource);
}
private PipeRealtimeCollectEventFactory() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
index 447ae5bdf09..481eb0e5ad7 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
@@ -24,12 +24,15 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import java.util.HashMap;
import java.util.Map;
public class PipeBuilder {
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
private final PipeMeta pipeMeta;
@@ -49,16 +52,19 @@ public class PipeBuilder {
final PipeRuntimeMeta pipeRuntimeMeta = pipeMeta.getRuntimeMeta();
for (Map.Entry<TConsensusGroupId, PipeTaskMeta>
consensusGroupIdToPipeTaskMeta :
pipeRuntimeMeta.getConsensusGroupIdToTaskMetaMap().entrySet()) {
- consensusGroupIdToPipeTaskMap.put(
- consensusGroupIdToPipeTaskMeta.getKey(),
- new PipeTaskBuilder(
- pipeName,
-
Integer.toString(consensusGroupIdToPipeTaskMeta.getKey().getId()),
- // TODO:
consensusGroupIdToPipeTaskMeta.getValue().getProgressIndex() is not used
- collectorParameters,
- processorParameters,
- connectorParameters)
- .build());
+ if (consensusGroupIdToPipeTaskMeta.getValue().getLeaderDataNodeId()
+ == CONFIG.getDataNodeId()) {
+ consensusGroupIdToPipeTaskMap.put(
+ consensusGroupIdToPipeTaskMeta.getKey(),
+ new PipeTaskBuilder(
+ pipeName,
+ consensusGroupIdToPipeTaskMeta.getKey(),
+ consensusGroupIdToPipeTaskMeta.getValue(),
+ collectorParameters,
+ processorParameters,
+ connectorParameters)
+ .build());
+ }
}
return consensusGroupIdToPipeTaskMap;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
index 26475c5c4b4..887b31a6e3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
@@ -19,12 +19,13 @@
package org.apache.iotdb.db.pipe.task;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.db.pipe.task.stage.PipeTaskStage;
public class PipeTask {
private final String pipeName;
- private final String dataRegionId;
+ private final TConsensusGroupId dataRegionId;
private final PipeTaskStage collectorStage;
private final PipeTaskStage processorStage;
@@ -32,7 +33,7 @@ public class PipeTask {
PipeTask(
String pipeName,
- String dataRegionId,
+ TConsensusGroupId dataRegionId,
PipeTaskStage collectorStage,
PipeTaskStage processorStage,
PipeTaskStage connectorStage) {
@@ -68,7 +69,7 @@ public class PipeTask {
connectorStage.stop();
}
- public String getDataRegionId() {
+ public TConsensusGroupId getDataRegionId() {
return dataRegionId;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index 51e429b0455..d42d62e5f98 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.pipe.task;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.task.stage.PipeTaskCollectorStage;
import org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage;
import org.apache.iotdb.db.pipe.task.stage.PipeTaskProcessorStage;
@@ -28,28 +30,33 @@ import org.apache.iotdb.pipe.api.customizer.PipeParameters;
public class PipeTaskBuilder {
private final String pipeName;
- private final String dataRegionId;
+ private final TConsensusGroupId dataRegionId;
+ private final PipeTaskMeta pipeTaskMeta;
private final PipeParameters pipeCollectorParameters;
private final PipeParameters pipeProcessorParameters;
private final PipeParameters pipeConnectorParameters;
PipeTaskBuilder(
String pipeName,
- String dataRegionId,
+ TConsensusGroupId dataRegionId,
+ PipeTaskMeta pipeTaskMeta,
PipeParameters pipeCollectorParameters,
PipeParameters pipeProcessorParameters,
PipeParameters pipeConnectorParameters) {
this.pipeName = pipeName;
this.dataRegionId = dataRegionId;
+ this.pipeTaskMeta = pipeTaskMeta;
this.pipeCollectorParameters = pipeCollectorParameters;
this.pipeProcessorParameters = pipeProcessorParameters;
this.pipeConnectorParameters = pipeConnectorParameters;
}
- public PipeTaskBuilder(String dataRegionId, PipeStaticMeta pipeStaticMeta) {
+ public PipeTaskBuilder(
+ TConsensusGroupId dataRegionId, PipeTaskMeta pipeTaskMeta,
PipeStaticMeta pipeStaticMeta) {
this(
pipeStaticMeta.getPipeName(),
dataRegionId,
+ pipeTaskMeta,
pipeStaticMeta.getCollectorParameters(),
pipeStaticMeta.getProcessorParameters(),
pipeStaticMeta.getConnectorParameters());
@@ -60,7 +67,7 @@ public class PipeTaskBuilder {
// we first build the collector and connector, then build the processor.
final PipeTaskCollectorStage collectorStage =
- new PipeTaskCollectorStage(dataRegionId, pipeCollectorParameters);
+ new PipeTaskCollectorStage(dataRegionId, pipeTaskMeta,
pipeCollectorParameters);
final PipeTaskConnectorStage connectorStage =
new PipeTaskConnectorStage(pipeConnectorParameters);
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index a4866e87e8e..3dd445c7489 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.pipe.task.stage;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
import org.apache.iotdb.db.pipe.core.collector.IoTDBDataRegionCollector;
@@ -53,7 +55,10 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
private final PipeCollector pipeCollector;
- public PipeTaskCollectorStage(String dataRegionId, PipeParameters
collectorParameters) {
+ public PipeTaskCollectorStage(
+ TConsensusGroupId dataRegionId,
+ PipeTaskMeta pipeTaskMeta,
+ PipeParameters collectorParameters) {
// TODO: avoid if-else, use reflection to create collector all the time
if (collectorParameters
.getStringOrDefault(
@@ -69,10 +74,10 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
// collector
this.collectorParameters
.getAttribute()
- .put(PipeCollectorConstant.DATA_REGION_KEY, dataRegionId);
+ .put(PipeCollectorConstant.DATA_REGION_KEY,
String.valueOf(dataRegionId.getId()));
collectorPendingQueue = new ListenableUnblockingPendingQueue<>();
- this.pipeCollector = new IoTDBDataRegionCollector(collectorPendingQueue);
+ this.pipeCollector = new IoTDBDataRegionCollector(pipeTaskMeta,
collectorPendingQueue);
} else {
this.collectorParameters = collectorParameters;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 6bbec4480c0..5542b943d59 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.task.stage;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.core.event.view.collector.PipeEventCollector;
@@ -60,7 +61,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
*/
public PipeTaskProcessorStage(
String pipeName,
- String dataRegionId,
+ TConsensusGroupId dataRegionId,
EventSupplier pipeCollectorInputEventSupplier,
@Nullable ListenablePendingQueue<Event> pipeCollectorInputPendingQueue,
PipeParameters pipeProcessorParameters,
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceProgressIndexTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceProgressIndexTest.java
new file mode 100644
index 00000000000..8a9957cf47a
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceProgressIndexTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.storagegroup;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.storagegroup.timeindex.DeviceTimeIndex;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+public class TsFileResourceProgressIndexTest {
+ private final File file =
+ new File(
+
TsFileNameGenerator.generateNewTsFilePath(TestConstant.BASE_OUTPUT_PATH, 1, 1,
1, 1));
+ private final TsFileResource tsFileResource = new TsFileResource(file);
+ private final Map<String, Integer> deviceToIndex = new HashMap<>();
+ private final long[] startTimes = new long[DEVICE_NUM];
+ private final long[] endTimes = new long[DEVICE_NUM];
+ private static final int DEVICE_NUM = 100;
+
+ private final List<ProgressIndex> indexList = new ArrayList<>();
+ private static final int INDEX_NUM = 1000;
+
+ @Before
+ public void setUp() {
+ IntStream.range(0, DEVICE_NUM).forEach(i -> deviceToIndex.put("root.sg.d"
+ i, i));
+ DeviceTimeIndex deviceTimeIndex = new DeviceTimeIndex(deviceToIndex,
startTimes, endTimes);
+ IntStream.range(0, DEVICE_NUM)
+ .forEach(
+ i -> {
+ deviceTimeIndex.updateStartTime("root.sg.d" + i, i);
+ deviceTimeIndex.updateEndTime("root.sg.d" + i, i + 1);
+ });
+ tsFileResource.setTimeIndex(deviceTimeIndex);
+ tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+
+ IntStream.range(0, INDEX_NUM).forEach(i -> indexList.add(new
MockProgressIndex(i)));
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ // clean fake file
+ if (file.exists()) {
+ FileUtils.delete(file);
+ }
+ File resourceFile = new File(file.getName() +
TsFileResource.RESOURCE_SUFFIX);
+ if (resourceFile.exists()) {
+ FileUtils.delete(resourceFile);
+ }
+ }
+
+ @Test
+ public void testProgressIndexRecorder() {
+ Assert.assertTrue(
+ new
MockProgressIndex(0).isAfter(tsFileResource.getMaxProgressIndexAfterClose()));
+
+ indexList.forEach(tsFileResource::updateProgressIndex);
+
+ Assert.assertFalse(
+ new
MockProgressIndex(-1).isAfter(tsFileResource.getMaxProgressIndexAfterClose()));
+ Assert.assertFalse(
+ new
MockProgressIndex(0).isAfter(tsFileResource.getMaxProgressIndexAfterClose()));
+ Assert.assertFalse(
+ new
MockProgressIndex(1).isAfter(tsFileResource.getMaxProgressIndexAfterClose()));
+ Assert.assertFalse(
+ new MockProgressIndex(INDEX_NUM - 1)
+ .isAfter(tsFileResource.getMaxProgressIndexAfterClose()));
+
+ Assert.assertTrue(
+ new
MockProgressIndex(INDEX_NUM).isAfter(tsFileResource.getMaxProgressIndexAfterClose()));
+ Assert.assertTrue(
+ new MockProgressIndex(Integer.MAX_VALUE)
+ .isAfter(tsFileResource.getMaxProgressIndexAfterClose()));
+
+ Assert.assertFalse(
+ new MockProgressIndex(1, INDEX_NUM - 1)
+ .isAfter(tsFileResource.getMaxProgressIndexAfterClose()));
+ }
+
+ @Test
+ public void testProgressIndexRecorderSerialize() {
+ // TODO: wait for implements of ProgressIndex.deserializeFrom
+ }
+
+ public static class MockProgressIndex implements ProgressIndex {
+ private final int type;
+ private int val;
+
+ public MockProgressIndex(int val) {
+ this(0, val);
+ }
+
+ public MockProgressIndex(int type, int val) {
+ this.type = type;
+ this.val = val;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(val, byteBuffer);
+ }
+
+ @Override
+ public void serialize(OutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(val, stream);
+ }
+
+ @Override
+ public boolean isAfter(ProgressIndex progressIndex) {
+ if (!(progressIndex instanceof MockProgressIndex)) {
+ return true;
+ }
+
+ MockProgressIndex that = (MockProgressIndex) progressIndex;
+ return this.type == that.type && this.val > that.val;
+ }
+
+ @Override
+ public boolean equals(ProgressIndex progressIndex) {
+ if (!(progressIndex instanceof MockProgressIndex)) {
+ return false;
+ }
+
+ MockProgressIndex that = (MockProgressIndex) progressIndex;
+ return this.type == that.type && this.val == that.val;
+ }
+
+ @Override
+ public ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex
progressIndex) {
+ if (!(progressIndex instanceof MockProgressIndex)) {
+ throw new IllegalStateException("Mock update error.");
+ }
+
+ MockProgressIndex that = (MockProgressIndex) progressIndex;
+ if (that.type == this.type) {
+ this.val = Math.max(this.val, that.val);
+ }
+ return this;
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
index d833e2761a0..198761eee3b 100644
---
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.core.collector;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
import
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
import
org.apache.iotdb.db.pipe.core.collector.realtime.matcher.CachedSchemaPatternMatcher;
@@ -64,7 +65,8 @@ public class CachedSchemaPatternMatcherTest {
@Test
public void testCachedMatcher() throws ExecutionException,
InterruptedException {
- PipeRealtimeDataRegionCollector databaseCollector = new
PipeRealtimeDataRegionFakeCollector();
+ PipeRealtimeDataRegionCollector databaseCollector =
+ new PipeRealtimeDataRegionFakeCollector(null);
databaseCollector.customize(
new PipeParameters(
new HashMap<String, String>() {
@@ -79,7 +81,8 @@ public class CachedSchemaPatternMatcherTest {
int deviceCollectorNum = 10;
int seriesCollectorNum = 10;
for (int i = 0; i < deviceCollectorNum; i++) {
- PipeRealtimeDataRegionCollector deviceCollector = new
PipeRealtimeDataRegionFakeCollector();
+ PipeRealtimeDataRegionCollector deviceCollector =
+ new PipeRealtimeDataRegionFakeCollector(null);
int finalI1 = i;
deviceCollector.customize(
new PipeParameters(
@@ -92,7 +95,8 @@ public class CachedSchemaPatternMatcherTest {
null);
collectorList.add(deviceCollector);
for (int j = 0; j < seriesCollectorNum; j++) {
- PipeRealtimeDataRegionCollector seriesCollector = new
PipeRealtimeDataRegionFakeCollector();
+ PipeRealtimeDataRegionCollector seriesCollector =
+ new PipeRealtimeDataRegionFakeCollector(null);
int finalI = i;
int finalJ = j;
seriesCollector.customize(
@@ -149,6 +153,10 @@ public class CachedSchemaPatternMatcherTest {
public static class PipeRealtimeDataRegionFakeCollector extends
PipeRealtimeDataRegionCollector {
+ public PipeRealtimeDataRegionFakeCollector(PipeTaskMeta pipeTaskMeta) {
+ super(pipeTaskMeta);
+ }
+
@Override
public Event supply() {
return null;
diff --git
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index 45b0cf504b2..fbb399831eb 100644
---
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -102,13 +102,17 @@ public class PipeRealtimeCollectTest {
// set up realtime collector
try (PipeRealtimeDataRegionHybridCollector collector1 =
- new PipeRealtimeDataRegionHybridCollector(new
ListenableUnblockingPendingQueue<>());
+ new PipeRealtimeDataRegionHybridCollector(
+ null, new ListenableUnblockingPendingQueue<>());
PipeRealtimeDataRegionHybridCollector collector2 =
- new PipeRealtimeDataRegionHybridCollector(new
ListenableUnblockingPendingQueue<>());
+ new PipeRealtimeDataRegionHybridCollector(
+ null, new ListenableUnblockingPendingQueue<>());
PipeRealtimeDataRegionHybridCollector collector3 =
- new PipeRealtimeDataRegionHybridCollector(new
ListenableUnblockingPendingQueue<>());
+ new PipeRealtimeDataRegionHybridCollector(
+ null, new ListenableUnblockingPendingQueue<>());
PipeRealtimeDataRegionHybridCollector collector4 =
- new PipeRealtimeDataRegionHybridCollector(new
ListenableUnblockingPendingQueue<>())) {
+ new PipeRealtimeDataRegionHybridCollector(
+ null, new ListenableUnblockingPendingQueue<>())) {
collector1.customize(
new PipeParameters(