This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d3548ed2acd [IOTDB-6209] Pipe: Solving the topological order of the
progress index in the historical data collection phase (#11478)
d3548ed2acd is described below
commit d3548ed2acd013530a25898dbd43d17eeafa0e1d
Author: yschengzi <[email protected]>
AuthorDate: Fri Nov 10 01:11:36 2023 +0800
[IOTDB-6209] Pipe: Solving the topological order of the progress index in
the historical data collection phase (#11478)
* Problem:
When the pipe performs historical data collection, it currently sends
sequential data first and then disorganized data, which is obviously wrong
because the progress index of some of the disorganized data may be smaller than
that of sequential files, which causes the pipe to record the wrong progress
index as the progress information, resulting in some of the disorganized files
not being sent.
* Solution:
After collecting all the historical data, solve for the topological order
according to the progress index contained in the TsFile. Use the topological
order of the file as the order in which the historical data is collected.
---
.../PipeHistoricalDataRegionTsFileExtractor.java | 21 ++-
.../TsFileResourceProgressIndexTest.java | 162 ++++++++++++++++++++-
.../commons/consensus/index/ProgressIndex.java | 146 ++++++++++++++++---
.../consensus/index/impl/HybridProgressIndex.java | 36 ++++-
.../consensus/index/impl/IoTProgressIndex.java | 19 ++-
.../consensus/index/impl/MinimumProgressIndex.java | 12 +-
.../consensus/index/impl/RecoverProgressIndex.java | 22 ++-
.../consensus/index/impl/SimpleProgressIndex.java | 11 +-
.../iotdb/commons/pipe/PipeMetaDeSerTest.java | 4 +-
9 files changed, 385 insertions(+), 48 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index b1d5037078a..9c9fb01ff25 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -42,9 +42,11 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.stream.Collectors;
@@ -235,6 +237,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
}
dataRegion.writeLock("Pipe: start to extract historical TsFile");
+ final long startHistoricalExtractionTime = System.currentTimeMillis();
try {
synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
final long lastFlushedByPipeTime =
@@ -248,7 +251,8 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
final TsFileManager tsFileManager = dataRegion.getTsFileManager();
tsFileManager.readLock();
try {
- pendingQueue = new ArrayDeque<>(tsFileManager.size(true) +
tsFileManager.size(false));
+ final List<TsFileResource> resourceList =
+ new ArrayList<>(tsFileManager.size(true) +
tsFileManager.size(false));
final Collection<TsFileResource> sequenceTsFileResources =
tsFileManager.getTsFileList(true).stream()
@@ -261,7 +265,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
&&
isTsFileResourceOverlappedWithTimeRange(resource)
&&
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
.collect(Collectors.toList());
- pendingQueue.addAll(sequenceTsFileResources);
+ resourceList.addAll(sequenceTsFileResources);
final Collection<TsFileResource> unsequenceTsFileResources =
tsFileManager.getTsFileList(false).stream()
@@ -274,9 +278,9 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
&&
isTsFileResourceOverlappedWithTimeRange(resource)
&&
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
.collect(Collectors.toList());
- pendingQueue.addAll(unsequenceTsFileResources);
+ resourceList.addAll(unsequenceTsFileResources);
- pendingQueue.forEach(
+ resourceList.forEach(
resource -> {
// Pin the resource, in case the file is removed by compaction
or anything.
// Will unpin it after the PipeTsFileInsertionEvent is created
and pinned.
@@ -287,12 +291,17 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
}
});
+ resourceList.sort(
+ (o1, o2) ->
o1.getMaxProgressIndex().topologicalCompareTo(o2.getMaxProgressIndex()));
+ pendingQueue = new ArrayDeque<>(resourceList);
+
LOGGER.info(
"Pipe: start to extract historical TsFile, data region {}, "
- + "sequence file count {}, unsequence file count {}",
+ + "sequence file count {}, unsequence file count {},
historical extraction time {} ms",
dataRegionId,
sequenceTsFileResources.size(),
- unsequenceTsFileResources.size());
+ unsequenceTsFileResources.size(),
+ System.currentTimeMillis() - startHistoricalExtractionTime);
} finally {
tsFileManager.readUnlock();
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
index 0dd80260cee..8d819e0d80b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -38,14 +39,18 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import javax.annotation.Nonnull;
+
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.stream.IntStream;
public class TsFileResourceProgressIndexTest {
@@ -91,8 +96,8 @@ public class TsFileResourceProgressIndexTest {
@Test
public void testProgressIndexRecorder() {
- HybridProgressIndex hybridProgressIndex = new HybridProgressIndex();
- hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new
SimpleProgressIndex(3, 4));
+ HybridProgressIndex hybridProgressIndex =
+ new HybridProgressIndex(new SimpleProgressIndex(3, 4));
hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new
SimpleProgressIndex(6, 6));
hybridProgressIndex.updateToMinimumIsAfterProgressIndex(
new RecoverProgressIndex(1, new SimpleProgressIndex(1, 2)));
@@ -137,7 +142,7 @@ public class TsFileResourceProgressIndexTest {
// TODO: wait for implements of ProgressIndex.deserializeFrom
}
- public static class MockProgressIndex implements ProgressIndex {
+ public static class MockProgressIndex extends ProgressIndex {
private final int type;
private int val;
@@ -161,7 +166,7 @@ public class TsFileResourceProgressIndexTest {
}
@Override
- public boolean isAfter(ProgressIndex progressIndex) {
+ public boolean isAfter(@Nonnull ProgressIndex progressIndex) {
if (!(progressIndex instanceof MockProgressIndex)) {
return true;
}
@@ -197,6 +202,11 @@ public class TsFileResourceProgressIndexTest {
public ProgressIndexType getType() {
throw new UnsupportedOperationException("method not implemented.");
}
+
+ @Override
+ public TotalOrderSumTuple getTotalOrderSumTuple() {
+ return new TotalOrderSumTuple((long) val);
+ }
}
@Test
@@ -204,9 +214,8 @@ public class TsFileResourceProgressIndexTest {
final IoTProgressIndex ioTProgressIndex = new IoTProgressIndex(1, 123L);
final RecoverProgressIndex recoverProgressIndex =
new RecoverProgressIndex(1, new SimpleProgressIndex(2, 2));
- final HybridProgressIndex hybridProgressIndex = new HybridProgressIndex();
+ final HybridProgressIndex hybridProgressIndex = new
HybridProgressIndex(ioTProgressIndex);
- hybridProgressIndex.updateToMinimumIsAfterProgressIndex(ioTProgressIndex);
hybridProgressIndex.updateToMinimumIsAfterProgressIndex(recoverProgressIndex);
Assert.assertTrue(hybridProgressIndex.isAfter(new IoTProgressIndex(1,
100L)));
@@ -218,4 +227,145 @@ public class TsFileResourceProgressIndexTest {
Assert.assertFalse(
hybridProgressIndex.isAfter(new RecoverProgressIndex(1, new
SimpleProgressIndex(2, 21))));
}
+
+ @Test
+ public void testProgressIndexMinimumProgressIndexTopologicalSort() {
+ List<ProgressIndex> progressIndexList = new ArrayList<>();
+
+ int ioTProgressIndexNum = 100;
+ IntStream.range(0, ioTProgressIndexNum)
+ .forEach(i -> progressIndexList.add(new IoTProgressIndex(i, 0L)));
+
+ int minimumProgressIndexNum = 100;
+ IntStream.range(0, minimumProgressIndexNum)
+ .forEach(i -> progressIndexList.add(MinimumProgressIndex.INSTANCE));
+
+ int hybridProgressIndexNum = 100;
+ IntStream.range(0, hybridProgressIndexNum)
+ .forEach(i -> progressIndexList.add(new HybridProgressIndex(new
IoTProgressIndex(i, 0L))));
+ IntStream.range(0, hybridProgressIndexNum)
+ .forEach(
+ i -> progressIndexList.add(new
HybridProgressIndex(MinimumProgressIndex.INSTANCE)));
+
+ Collections.shuffle(progressIndexList);
+ progressIndexList.sort(ProgressIndex::topologicalCompareTo);
+
+ int size = progressIndexList.size();
+ for (int i = 0; i < size - 1; i++) {
+ int finalI = i;
+ for (int j = i; j < size; j++) {
+ if (progressIndexList.get(i).isAfter(progressIndexList.get(j))) {
+ System.out.println("progressIndexList.get(i) = " +
progressIndexList.get(i));
+ System.out.println("i = " + i);
+ System.out.println(
+ "progressIndexList.get(i).getTotalOrderSumTuple() = "
+ + progressIndexList.get(i).getTotalOrderSumTuple());
+ System.out.println("progressIndexList.get(j) = " +
progressIndexList.get(j));
+ System.out.println("j = " + j);
+ System.out.println(
+ "progressIndexList.get(j).getTotalOrderSumTuple() = "
+ + progressIndexList.get(j).getTotalOrderSumTuple());
+ System.out.println(System.lineSeparator());
+ }
+ }
+ Assert.assertTrue(
+ IntStream.range(i, size)
+ .noneMatch(j ->
progressIndexList.get(finalI).isAfter(progressIndexList.get(j))));
+ }
+ }
+
+ @Test
+ public void testProgressIndexTopologicalSort() {
+ Random random = new Random();
+ List<ProgressIndex> progressIndexList = new ArrayList<>();
+
+ int ioTProgressIndexNum = 10000, peerIdRange = 3, searchIndexRange =
100000;
+ IntStream.range(0, ioTProgressIndexNum)
+ .forEach(
+ i ->
+ progressIndexList.add(
+ new IoTProgressIndex(
+ random.nextInt(peerIdRange), (long)
random.nextInt(searchIndexRange))));
+
+ int simpleProgressIndexNum = 10000, rebootTimesRange = 3,
memtableFlushOrderIdRange = 100000;
+ IntStream.range(0, simpleProgressIndexNum)
+ .forEach(
+ i ->
+ progressIndexList.add(
+ new SimpleProgressIndex(
+ random.nextInt(rebootTimesRange),
+ random.nextInt(memtableFlushOrderIdRange))));
+
+ int recoverProgressIndexNum = 10000, dataNodeIdRange = 3;
+ IntStream.range(0, recoverProgressIndexNum)
+ .forEach(
+ i ->
+ progressIndexList.add(
+ new RecoverProgressIndex(
+ random.nextInt(dataNodeIdRange),
+ new SimpleProgressIndex(
+ random.nextInt(rebootTimesRange),
+ random.nextInt(memtableFlushOrderIdRange)))));
+
+ int minimumProgressIndexNum = 10000;
+ IntStream.range(0, minimumProgressIndexNum)
+ .forEach(i -> progressIndexList.add(MinimumProgressIndex.INSTANCE));
+
+ int hybridProgressIndexNum = 10000;
+ IntStream.range(0, hybridProgressIndexNum)
+ .forEach(
+ i -> {
+ HybridProgressIndex hybridProgressIndex =
+ new HybridProgressIndex(
+ new IoTProgressIndex(
+ random.nextInt(peerIdRange), (long)
random.nextInt(searchIndexRange)));
+ if (random.nextInt(2) == 1) {
+ hybridProgressIndex.updateToMinimumIsAfterProgressIndex(
+ new SimpleProgressIndex(
+ random.nextInt(rebootTimesRange),
+ random.nextInt(memtableFlushOrderIdRange)));
+ }
+ if (random.nextInt(2) == 1) {
+ hybridProgressIndex.updateToMinimumIsAfterProgressIndex(
+ new RecoverProgressIndex(
+ random.nextInt(dataNodeIdRange),
+ new SimpleProgressIndex(
+ random.nextInt(rebootTimesRange),
+ random.nextInt(memtableFlushOrderIdRange))));
+ }
+ progressIndexList.add(hybridProgressIndex);
+ });
+
+ Collections.shuffle(progressIndexList);
+ final long startTime = System.currentTimeMillis();
+ progressIndexList.sort(ProgressIndex::topologicalCompareTo);
+ final long costTime = System.currentTimeMillis() - startTime;
+ System.out.println("ProgressIndex List Size = " +
progressIndexList.size());
+ System.out.println("sort time = " + costTime + "ms");
+ System.out.println(
+ ("Sort speed = " + (double) (costTime) / ((double)
(progressIndexList.size())) + "ms/s"));
+
+ int size = progressIndexList.size();
+ for (int i = 0; i < size - 1; i++) {
+ int finalI = i;
+ for (int j = i; j < size; j++) {
+ if (progressIndexList.get(i).isAfter(progressIndexList.get(j))) {
+ System.out.println("progressIndexList.get(i) = " +
progressIndexList.get(i));
+ System.out.println("i = " + i);
+ System.out.println(
+ "progressIndexList.get(i).getTotalOrderSumTuple() = "
+ + progressIndexList.get(i).getTotalOrderSumTuple());
+ System.out.println("progressIndexList.get(j) = " +
progressIndexList.get(j));
+ System.out.println("j = " + j);
+ System.out.println(
+ "progressIndexList.get(j).getTotalOrderSumTuple() = "
+ + progressIndexList.get(j).getTotalOrderSumTuple());
+ System.out.println(System.lineSeparator());
+ }
+ }
+ Assert.assertTrue(
+ IntStream.range(i, size)
+ .noneMatch(j ->
progressIndexList.get(finalI).isAfter(progressIndexList.get(j))));
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
index 0c789716f29..304afc88ecf 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
@@ -22,52 +22,84 @@ package org.apache.iotdb.commons.consensus.index;
import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import com.google.common.collect.ImmutableList;
+
+import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
-public interface ProgressIndex {
+/**
+ * The progress index is designed to express the progress on multiple two
independent causal chains.
+ * For example, all writes/updates/deletions within a single consensus group
form a complete causal
+ * chain. Since strict total order relations can be defined on each of these
causal chains, the
+ * progress index is considered to be composed of an n-tuple of total order
relations
+ * (S<sub>1</sub>,S<sub>2</sub>,S<sub>3</sub>,.... ,S<sub>n</sub>).
+ */
+public abstract class ProgressIndex {
/** Serialize this progress index to the given byte buffer. */
- void serialize(ByteBuffer byteBuffer);
+ public abstract void serialize(ByteBuffer byteBuffer);
/** Serialize this progress index to the given output stream. */
- void serialize(OutputStream stream) throws IOException;
+ public abstract void serialize(OutputStream stream) throws IOException;
/**
- * A.isAfter(B) is true if and only if A is strictly greater than B.
+ * A.isAfter(B) is true if and only if every tuple member in A is strictly
greater than the
+ * corresponding tuple member in B in its corresponding total order relation。
*
* @param progressIndex the progress index to be compared
* @return true if and only if this progress index is strictly greater than
the given consensus
* index
*/
- boolean isAfter(ProgressIndex progressIndex);
+ public abstract boolean isAfter(@Nonnull ProgressIndex progressIndex);
/**
- * A.equals(B) is true if and only if A is equal to B
+ * A.isAfter(B) is true if and only if every tuple member in A is the same
as the corresponding
+ * tuple member in B in its corresponding total order relation.
*
* @param progressIndex the progress index to be compared
* @return true if and only if this progress index is equal to the given
progress index
*/
- boolean equals(ProgressIndex progressIndex);
+ public abstract boolean equals(ProgressIndex progressIndex);
/**
- * A.equals(B) is true if and only if A is equal to B
+ * A.equals(B) is true if and only if A, B are both {@link ProgressIndex}
and A.equals(({@link
+ * ProgressIndex})B) is true.
*
* @param obj the object to be compared
* @return true if and only if this progress index is equal to the given
object
*/
@Override
- boolean equals(Object obj);
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof ProgressIndex)) {
+ return false;
+ }
+ return this.equals((ProgressIndex) obj);
+ }
/**
- * C = A.updateToMinimumIsAfterProgressIndex(B) where C should satisfy:
+ * Define the isEqualOrAfter relation, A.isEqualOrAfter(B) if and only if
each tuple member in A
+ * is greater than or equal to B in the corresponding total order relation.
+ *
+ * <p>C = A.updateToMinimumIsAfterProgressIndex(B) should be satisfied:
*
- * <p>(C.equals(A) || C.isAfter(A)) is true
+ * <p>C.isEqualOrAfter(A) is true,
*
- * <p>(C.equals(B) || C.isAfter(B)) is true
+ * <p>C.isEqualOrAfter(B) is true
*
- * <p>There is no D, such that D satisfies the above conditions and
C.isAfter(D) is true
+ * <p>there is no D such that C.isEqualOrAfter(D) is true.
*
* <p>The implementation of this function should be reflexive, that is
*
A.updateToMinimumIsAfterProgressIndex(B).equals(B.updateToMinimumIsAfterProgressIndex(A))
is
@@ -78,10 +110,25 @@ public interface ProgressIndex {
* @param progressIndex the progress index to be compared
* @return the minimum progress index after the given progress index and
this progress index
*/
- ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex
progressIndex);
+ public abstract ProgressIndex
updateToMinimumIsAfterProgressIndex(ProgressIndex progressIndex);
/** @return the type of this progress index */
- ProgressIndexType getType();
+ public abstract ProgressIndexType getType();
+
+ /**
+ * Get the sum of the tuples of each total order relation of the progress
index, which is used for
+ * topological sorting of the progress index.
+ *
+ * @return The sum of the tuples corresponding to all the total order
relations contained in the
+ * progress index.
+ */
+ public abstract TotalOrderSumTuple getTotalOrderSumTuple();
+
+ public final int topologicalCompareTo(ProgressIndex progressIndex) {
+ return progressIndex == null
+ ? 1
+ :
getTotalOrderSumTuple().compareTo(progressIndex.getTotalOrderSumTuple());
+ }
/**
* Blend two progress index together, the result progress index should
satisfy:
@@ -106,7 +153,7 @@ public interface ProgressIndex {
* progress index
* @return the minimum progress index after the first progress index and the
second progress index
*/
- static ProgressIndex blendProgressIndex(
+ protected static ProgressIndex blendProgressIndex(
ProgressIndex progressIndex1, ProgressIndex progressIndex2) {
if (progressIndex1 == null && progressIndex2 == null) {
return MinimumProgressIndex.INSTANCE;
@@ -118,7 +165,72 @@ public interface ProgressIndex {
return progressIndex1; // progressIndex1 is not null
}
- return new HybridProgressIndex(progressIndex1.getType().getType(),
progressIndex1)
+ return new HybridProgressIndex(progressIndex1)
.updateToMinimumIsAfterProgressIndex(progressIndex2);
}
+
+ /**
+ * Each total ordered relation of the progress index should be a tuple. This
class defines a way
+ * to sum and compare the size of the tuples of each total ordered relation
of progress index.
+ * This method maintains the relationship of progress index in the isAfter
relationship. It is
+ * mainly used for topologically sorting the progress index.
+ *
+ * <p>Notice:TotalOrderSumTuple is an ordered tuple, the larger the
subscript the higher the
+ * weight of the element when comparing sizes, e.g. (1, 2) is larger than
(2, 1).
+ */
+ protected static class TotalOrderSumTuple implements
Comparable<TotalOrderSumTuple> {
+ private final ImmutableList<Long> tuple;
+
+ /**
+ * ATTENTION: the order of the args is important, the larger the subscript
the higher the weight
+ * of the element when comparing sizes, e.g. (1, 2) is larger than (2, 1).
+ *
+ * @param args input args
+ */
+ public TotalOrderSumTuple(Long... args) {
+ tuple = ImmutableList.copyOf(args);
+ }
+
+ public TotalOrderSumTuple(List<Long> list) {
+ tuple = ImmutableList.copyOf(list);
+ }
+
+ @Override
+ public int compareTo(TotalOrderSumTuple that) {
+ if (that.tuple.size() != this.tuple.size()) {
+ return this.tuple.size() - that.tuple.size();
+ }
+ for (int i = this.tuple.size() - 1; i >= 0; --i) {
+ if (!this.tuple.get(i).equals(that.tuple.get(i))) {
+ return this.tuple.get(i) < that.tuple.get(i) ? -1 : 1;
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ public String toString() {
+ return "TotalOrderSumTuple{" + "tuple=" + tuple + '}';
+ }
+
+ public static TotalOrderSumTuple sum(List<TotalOrderSumTuple> tupleList) {
+ if (tupleList == null || tupleList.size() == 0) {
+ return new TotalOrderSumTuple();
+ }
+ if (tupleList.size() == 1) {
+ return tupleList.get(0);
+ }
+
+ List<Long> result =
+ LongStream.range(0, tupleList.stream().mapToInt(t ->
t.tuple.size()).max().getAsInt())
+ .map(o -> 0)
+ .boxed()
+ .collect(Collectors.toList());
+ tupleList.forEach(
+ t ->
+ IntStream.range(0, t.tuple.size())
+ .forEach(i -> result.set(i, result.get(i) +
t.tuple.get(i))));
+ return new TotalOrderSumTuple(result);
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
index 718970547cd..da558e52d65 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -30,20 +32,26 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
-public class HybridProgressIndex implements ProgressIndex {
+public class HybridProgressIndex extends ProgressIndex {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Map<Short, ProgressIndex> type2Index;
- public HybridProgressIndex() {
+ private HybridProgressIndex() {
this.type2Index = new HashMap<>();
}
- public HybridProgressIndex(short type, ProgressIndex progressIndex) {
- this.type2Index = new HashMap<>();
- type2Index.put(type, progressIndex);
+ public HybridProgressIndex(ProgressIndex progressIndex) {
+ short type = progressIndex.getType().getType();
+ if (ProgressIndexType.HYBRID_PROGRESS_INDEX.getType() != type) {
+ this.type2Index = new HashMap<>();
+ type2Index.put(type, progressIndex);
+ } else {
+ this.type2Index = ((HybridProgressIndex) progressIndex).type2Index;
+ }
}
@Override
@@ -79,11 +87,12 @@ public class HybridProgressIndex implements ProgressIndex {
}
@Override
- public boolean isAfter(ProgressIndex progressIndex) {
+ public boolean isAfter(@Nonnull ProgressIndex progressIndex) {
lock.readLock().lock();
try {
if (progressIndex instanceof MinimumProgressIndex) {
- return true;
+ return type2Index.size() > 1
+ ||
!type2Index.containsKey(ProgressIndexType.MINIMUM_PROGRESS_INDEX.getType());
}
if (!(progressIndex instanceof HybridProgressIndex)) {
@@ -192,6 +201,19 @@ public class HybridProgressIndex implements ProgressIndex {
return ProgressIndexType.HYBRID_PROGRESS_INDEX;
}
+ @Override
+ public TotalOrderSumTuple getTotalOrderSumTuple() {
+ lock.readLock().lock();
+ try {
+ return ProgressIndex.TotalOrderSumTuple.sum(
+ type2Index.values().stream()
+ .map(ProgressIndex::getTotalOrderSumTuple)
+ .collect(Collectors.toList()));
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
public static HybridProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
final HybridProgressIndex hybridProgressIndex = new HybridProgressIndex();
final int size = ReadWriteIOUtils.readInt(byteBuffer);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
index 60c4092f760..849a52b6981 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -31,13 +33,13 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-public class IoTProgressIndex implements ProgressIndex {
+public class IoTProgressIndex extends ProgressIndex {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Map<Integer, Long> peerId2SearchIndex;
- public IoTProgressIndex() {
+ private IoTProgressIndex() {
peerId2SearchIndex = new HashMap<>();
}
@@ -79,7 +81,7 @@ public class IoTProgressIndex implements ProgressIndex {
}
@Override
- public boolean isAfter(ProgressIndex progressIndex) {
+ public boolean isAfter(@Nonnull ProgressIndex progressIndex) {
lock.readLock().lock();
try {
if (progressIndex instanceof MinimumProgressIndex) {
@@ -176,6 +178,17 @@ public class IoTProgressIndex implements ProgressIndex {
return ProgressIndexType.IOT_PROGRESS_INDEX;
}
+ @Override
+ public TotalOrderSumTuple getTotalOrderSumTuple() {
+ lock.readLock().lock();
+ try {
+ return new TotalOrderSumTuple(
+
peerId2SearchIndex.values().stream().mapToLong(Long::longValue).sum());
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
public static IoTProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
final IoTProgressIndex ioTProgressIndex = new IoTProgressIndex();
final int size = ReadWriteIOUtils.readInt(byteBuffer);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
index a5b587c6669..c225e995d7a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
@@ -22,14 +22,17 @@ package org.apache.iotdb.commons.consensus.index.impl;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-public class MinimumProgressIndex implements ProgressIndex {
+public class MinimumProgressIndex extends ProgressIndex {
public static final MinimumProgressIndex INSTANCE = new
MinimumProgressIndex();
+ private static final TotalOrderSumTuple TOTAL_ORDER_SUM_TUPLE = new
TotalOrderSumTuple();
private MinimumProgressIndex() {}
@@ -44,7 +47,7 @@ public class MinimumProgressIndex implements ProgressIndex {
}
@Override
- public boolean isAfter(ProgressIndex progressIndex) {
+ public boolean isAfter(@Nonnull ProgressIndex progressIndex) {
return false;
}
@@ -79,6 +82,11 @@ public class MinimumProgressIndex implements ProgressIndex {
return ProgressIndexType.MINIMUM_PROGRESS_INDEX;
}
+ @Override
+ public TotalOrderSumTuple getTotalOrderSumTuple() {
+ return TOTAL_ORDER_SUM_TUPLE;
+ }
+
public static MinimumProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
return INSTANCE;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
index 9c03edbc2b4..18e694bc629 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -30,14 +32,15 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
-public class RecoverProgressIndex implements ProgressIndex {
+public class RecoverProgressIndex extends ProgressIndex {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Map<Integer, SimpleProgressIndex> dataNodeId2LocalIndex;
- public RecoverProgressIndex() {
+ private RecoverProgressIndex() {
this.dataNodeId2LocalIndex = new HashMap<>();
}
@@ -80,7 +83,7 @@ public class RecoverProgressIndex implements ProgressIndex {
}
@Override
- public boolean isAfter(ProgressIndex progressIndex) {
+ public boolean isAfter(@Nonnull ProgressIndex progressIndex) {
lock.readLock().lock();
try {
if (progressIndex instanceof MinimumProgressIndex) {
@@ -183,6 +186,19 @@ public class RecoverProgressIndex implements ProgressIndex
{
return ProgressIndexType.RECOVER_PROGRESS_INDEX;
}
+ @Override
+ public TotalOrderSumTuple getTotalOrderSumTuple() {
+ lock.readLock().lock();
+ try {
+ return ProgressIndex.TotalOrderSumTuple.sum(
+ dataNodeId2LocalIndex.values().stream()
+ .map(SimpleProgressIndex::getTotalOrderSumTuple)
+ .collect(Collectors.toList()));
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
public static RecoverProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
final RecoverProgressIndex recoverProgressIndex = new
RecoverProgressIndex();
final int size = ReadWriteIOUtils.readInt(byteBuffer);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
index 9ce9890ecb4..2984149db58 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
@@ -23,13 +23,15 @@ import
org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-public class SimpleProgressIndex implements ProgressIndex {
+public class SimpleProgressIndex extends ProgressIndex {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -68,7 +70,7 @@ public class SimpleProgressIndex implements ProgressIndex {
}
@Override
- public boolean isAfter(ProgressIndex progressIndex) {
+ public boolean isAfter(@Nonnull ProgressIndex progressIndex) {
lock.readLock().lock();
try {
if (progressIndex instanceof MinimumProgressIndex) {
@@ -173,6 +175,11 @@ public class SimpleProgressIndex implements ProgressIndex {
return ProgressIndexType.SIMPLE_PROGRESS_INDEX;
}
+ @Override
+ public TotalOrderSumTuple getTotalOrderSumTuple() {
+ return new TotalOrderSumTuple(memtableFlushOrderId, (long) rebootTimes);
+ }
+
public static SimpleProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
final int rebootTimes = ReadWriteIOUtils.readInt(byteBuffer);
final long memtableFlushOrderId = ReadWriteIOUtils.readLong(byteBuffer);
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/PipeMetaDeSerTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/PipeMetaDeSerTest.java
index 9eee8396d03..d8bd4a8c0ac 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/PipeMetaDeSerTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/PipeMetaDeSerTest.java
@@ -65,8 +65,8 @@ public class PipeMetaDeSerTest {
PipeStaticMeta pipeStaticMeta1 =
PipeStaticMeta.deserialize(staticByteBuffer);
Assert.assertEquals(pipeStaticMeta, pipeStaticMeta1);
- HybridProgressIndex hybridProgressIndex = new HybridProgressIndex();
- hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new
SimpleProgressIndex(1, 2));
+ HybridProgressIndex hybridProgressIndex =
+ new HybridProgressIndex(new SimpleProgressIndex(1, 2));
hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new
SimpleProgressIndex(2, 4));
hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new
IoTProgressIndex(3, 6L));