This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new c223f62f3ce Pipe: perform deep copy for incoming progress index when
constructing and updating progress index & fix hash code of progress index
(#13441) (#13503)
c223f62f3ce is described below
commit c223f62f3ce1577aab2c0f1c4e79fee3bdad11eb
Author: V_Galaxy <[email protected]>
AuthorDate: Fri Sep 13 10:59:23 2024 +0800
Pipe: perform deep copy for incoming progress index when constructing and
updating progress index & fix hash code of progress index (#13441) (#13503)
---
.../PipeHistoricalDataRegionTsFileExtractor.java | 2 +-
.../plan/node/write/InsertMultiTabletsNode.java | 2 +-
.../plan/planner/plan/node/write/InsertNode.java | 2 +-
.../planner/plan/node/write/InsertRowsNode.java | 4 +--
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 2 +-
.../dataregion/tsfile/TsFileResource.java | 4 +--
.../TsFileResourceProgressIndexTest.java | 5 ++++
.../commons/consensus/index/ProgressIndex.java | 34 ++++++++++++++++++----
.../consensus/index/impl/HybridProgressIndex.java | 27 ++++++++++++-----
.../consensus/index/impl/IoTProgressIndex.java | 16 ++++++++--
.../consensus/index/impl/MetaProgressIndex.java | 8 ++++-
.../consensus/index/impl/MinimumProgressIndex.java | 7 ++++-
.../consensus/index/impl/RecoverProgressIndex.java | 25 +++++++++++-----
.../consensus/index/impl/SimpleProgressIndex.java | 12 ++++++--
.../consensus/index/impl/StateProgressIndex.java | 22 +++++++++++---
.../index/impl/TimeWindowStateProgressIndex.java | 17 +++++++++--
16 files changed, 146 insertions(+), 43 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index a0292d1b245..bfd57fe6e24 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -261,7 +261,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
pipeName = environment.getPipeName();
creationTime = environment.getCreationTime();
pipeTaskMeta = environment.getPipeTaskMeta();
- startIndex = environment.getPipeTaskMeta().getProgressIndex();
+ startIndex = environment.getPipeTaskMeta().getProgressIndex().deepCopy();
dataRegionId = environment.getRegionId();
synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 430ab19c7c9..a5885604645 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -289,7 +289,7 @@ public class InsertMultiTabletsNode extends InsertNode {
@Override
public void setProgressIndex(ProgressIndex progressIndex) {
- this.progressIndex = progressIndex;
+ this.progressIndex = progressIndex.deepCopy();
insertTabletNodeList.forEach(node -> node.setProgressIndex(progressIndex));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index 161b6ac9d2e..06581922e1b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -277,7 +277,7 @@ public abstract class InsertNode extends SearchNode
implements ComparableConsens
@Override
public void setProgressIndex(ProgressIndex progressIndex) {
- this.progressIndex = progressIndex;
+ this.progressIndex = progressIndex.deepCopy();
}
// endregion
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index b6f16b7d70a..2e86672aaa8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -276,7 +276,7 @@ public class InsertRowsNode extends InsertNode implements
WALEntryValue {
@Override
public void setProgressIndex(ProgressIndex progressIndex) {
- this.progressIndex = progressIndex;
+ this.progressIndex = progressIndex.deepCopy();
insertRowNodeList.forEach(insertRowNode ->
insertRowNode.setProgressIndex(progressIndex));
}
@@ -287,7 +287,7 @@ public class InsertRowsNode extends InsertNode implements
WALEntryValue {
this.progressIndex =
(this.progressIndex == null)
- ? progressIndex
+ ? progressIndex.deepCopy()
:
this.progressIndex.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 0ec16ef69c7..86764946903 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -331,7 +331,7 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
@Override
public void setProgressIndex(ProgressIndex progressIndex) {
- this.progressIndex = progressIndex;
+ this.progressIndex = progressIndex.deepCopy();
insertRowNodeList.forEach(insertRowNode ->
insertRowNode.setProgressIndex(progressIndex));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index b7973cc03df..10033eb7381 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -1135,7 +1135,7 @@ public class TsFileResource {
maxProgressIndex =
(maxProgressIndex == null
- ? progressIndex
+ ? progressIndex.deepCopy()
:
maxProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex));
}
@@ -1144,7 +1144,7 @@ public class TsFileResource {
return;
}
- maxProgressIndex = progressIndex;
+ maxProgressIndex = progressIndex.deepCopy();
}
public ProgressIndex getMaxProgressIndexAfterClose() throws
IllegalStateException {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
index fa45296bdb6..3c5fd714d8e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
@@ -188,6 +188,11 @@ public class TsFileResourceProgressIndexTest {
return this.type == that.type && this.val == that.val;
}
+ @Override
+ public ProgressIndex deepCopy() {
+ return new MockProgressIndex(type, val);
+ }
+
@Override
public ProgressIndex
updateToMinimumEqualOrIsAfterProgressIndex(ProgressIndex progressIndex) {
if (!(progressIndex instanceof MockProgressIndex)) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
index c876af8f09f..3160b1c23a1 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
@@ -95,6 +95,22 @@ public abstract class ProgressIndex {
return super.hashCode();
}
+ /**
+ * Creates and returns a deep copy of this {@link ProgressIndex} instance.
+ *
+ * <p>This method performs a deep copy, meaning all nested objects and
fields within this {@link
+ * ProgressIndex} are recursively copied, resulting in a new instance that
is independent of the
+ * original. Modifications to the copied instance will not affect the
original instance and vice
+ * versa.
+ *
+ * <p>When constructing or updating another {@link ProgressIndex} using an
existing {@link
+ * ProgressIndex}, it is recommended to perform a deep copy of the existing
instance to avoid
+ * unintended modifications or shared state between the instances.
+ *
+ * @return a new {@link ProgressIndex} instance that is a deep copy of this
progress index
+ */
+ public abstract ProgressIndex deepCopy();
+
/**
* Define the isEqualOrAfter relation, A.isEqualOrAfter(B) if and only if
each tuple member in A
* is greater than or equal to B in the corresponding total order relation.
@@ -112,11 +128,13 @@ public abstract class ProgressIndex {
*
A.updateToMinimumIsAfterProgressIndex(B).equals(B.updateToMinimumIsAfterProgressIndex(A))
is
* {@code true}
*
- * <p>Note: this function may modify the caller.
+ * <p>Note: this function may modify the caller (this) but will not modify
{@param progressIndex}.
*
* @param progressIndex the {@link ProgressIndex} to be compared
* @return the minimum {@link ProgressIndex} after the given {@link
ProgressIndex} and this {@link
- * ProgressIndex}
+ * ProgressIndex}, the returned {@link ProgressIndex} will contain deep
copies of all
+ * references to the given {@param progressIndex}, ensuring no shared
state between the
+ * original and the result
*/
public abstract ProgressIndex updateToMinimumEqualOrIsAfterProgressIndex(
ProgressIndex progressIndex);
@@ -148,6 +166,8 @@ public abstract class ProgressIndex {
*
* <p>There is no R, such that R satisfies the above conditions and
result.isAfter(R) is true
*
+ * <p>Note: this function will not modify {@param progressIndex1} and
{@param progressIndex2}.
+ *
* @param progressIndex1 the first {@link ProgressIndex}. if it is {@code
null}, the result {@link
* ProgressIndex} should be the second {@link ProgressIndex}. if it is a
{@link
* MinimumProgressIndex}, the result {@link ProgressIndex} should be the
second {@link
@@ -163,7 +183,9 @@ public abstract class ProgressIndex {
* should be the minimum {@link ProgressIndex} equal or after the first
{@link ProgressIndex}
* and the second {@link ProgressIndex}
* @return the minimum {@link ProgressIndex} after the first {@link
ProgressIndex} and the second
- * {@link ProgressIndex}
+ * {@link ProgressIndex}, the returned {@link ProgressIndex} will
contain deep copies of all
+ * references to {@param progressIndex1} and {@param progressIndex2},
ensuring that the result
+ * is independent and modifications to it do not affect the original
instances
*/
protected static ProgressIndex blendProgressIndex(
ProgressIndex progressIndex1, ProgressIndex progressIndex2) {
@@ -171,14 +193,14 @@ public abstract class ProgressIndex {
return MinimumProgressIndex.INSTANCE;
}
if (progressIndex1 == null || progressIndex1 instanceof
MinimumProgressIndex) {
- return progressIndex2 == null ? MinimumProgressIndex.INSTANCE :
progressIndex2;
+ return progressIndex2 == null ? MinimumProgressIndex.INSTANCE :
progressIndex2.deepCopy();
}
if (progressIndex2 == null || progressIndex2 instanceof
MinimumProgressIndex) {
- return progressIndex1; // progressIndex1 is not null
+ return progressIndex1.deepCopy(); // progressIndex1 is not null
}
return progressIndex1 instanceof StateProgressIndex
- ?
progressIndex1.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex2)
+ ?
progressIndex1.deepCopy().updateToMinimumEqualOrIsAfterProgressIndex(progressIndex2)
: new HybridProgressIndex(progressIndex1)
.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex2);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
index 70727c9dcf6..efa566a9c8d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.consensus.index.impl;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import com.google.common.collect.ImmutableMap;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import javax.annotation.Nonnull;
@@ -32,6 +33,8 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
@@ -47,16 +50,19 @@ public class HybridProgressIndex extends ProgressIndex {
public HybridProgressIndex(ProgressIndex progressIndex) {
short type = progressIndex.getType().getType();
+ this.type2Index = new HashMap<>();
if (ProgressIndexType.HYBRID_PROGRESS_INDEX.getType() != type) {
- this.type2Index = new HashMap<>();
- type2Index.put(type, progressIndex);
+ this.type2Index.put(type, progressIndex.deepCopy());
} else {
- this.type2Index = ((HybridProgressIndex) progressIndex).type2Index;
+ for (Entry<Short, ProgressIndex> entry :
+ ((HybridProgressIndex) progressIndex).type2Index.entrySet()) {
+ this.type2Index.put(entry.getKey(), entry.getValue().deepCopy());
+ }
}
}
public Map<Short, ProgressIndex> getType2Index() {
- return type2Index;
+ return ImmutableMap.copyOf(((HybridProgressIndex) deepCopy()).type2Index);
}
@Override
@@ -166,7 +172,12 @@ public class HybridProgressIndex extends ProgressIndex {
@Override
public int hashCode() {
- return 0;
+ return Objects.hash(type2Index);
+ }
+
+ @Override
+ public ProgressIndex deepCopy() {
+ return new HybridProgressIndex(this);
}
@Override
@@ -178,7 +189,7 @@ public class HybridProgressIndex extends ProgressIndex {
}
if (progressIndex instanceof StateProgressIndex) {
- return progressIndex.updateToMinimumEqualOrIsAfterProgressIndex(this);
+ return
progressIndex.deepCopy().updateToMinimumEqualOrIsAfterProgressIndex(this);
}
if (!(progressIndex instanceof HybridProgressIndex)) {
@@ -186,7 +197,7 @@ public class HybridProgressIndex extends ProgressIndex {
progressIndex.getType().getType(),
(thisK, thisV) ->
(thisV == null
- ? progressIndex
+ ? progressIndex.deepCopy()
:
thisV.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex)));
return this;
}
@@ -199,7 +210,7 @@ public class HybridProgressIndex extends ProgressIndex {
thatK,
(thisK, thisV) ->
(thisV == null
- ? thatV
+ ? thatV.deepCopy()
:
thisV.updateToMinimumEqualOrIsAfterProgressIndex(thatV))));
return this;
} finally {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
index 1d7c31bbc1e..d1fa57c9406 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.consensus.index.impl;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import com.google.common.collect.ImmutableMap;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import javax.annotation.Nonnull;
@@ -32,6 +33,7 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class IoTProgressIndex extends ProgressIndex {
@@ -45,8 +47,11 @@ public class IoTProgressIndex extends ProgressIndex {
}
public IoTProgressIndex(Integer peerId, Long searchIndex) {
- peerId2SearchIndex = new HashMap<>();
- peerId2SearchIndex.put(peerId, searchIndex);
+ this(ImmutableMap.of(peerId, searchIndex));
+ }
+
+ public IoTProgressIndex(Map<Integer, Long> peerId2SearchIndex) {
+ this.peerId2SearchIndex = new HashMap<>(peerId2SearchIndex);
}
@Override
@@ -151,7 +156,12 @@ public class IoTProgressIndex extends ProgressIndex {
@Override
public int hashCode() {
- return 0;
+ return Objects.hash(peerId2SearchIndex);
+ }
+
+ @Override
+ public ProgressIndex deepCopy() {
+ return new IoTProgressIndex(peerId2SearchIndex);
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
index 679eecbf0eb..5349f83d294 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
@@ -30,6 +30,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class MetaProgressIndex extends ProgressIndex {
@@ -130,7 +131,12 @@ public class MetaProgressIndex extends ProgressIndex {
@Override
public int hashCode() {
- return 0;
+ return Objects.hash(index);
+ }
+
+ @Override
+ public ProgressIndex deepCopy() {
+ return new MetaProgressIndex(index);
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
index e476409a30b..cd5b254f74f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
@@ -72,9 +72,14 @@ public class MinimumProgressIndex extends ProgressIndex {
return 0;
}
+ @Override
+ public ProgressIndex deepCopy() {
+ return INSTANCE;
+ }
+
@Override
public ProgressIndex
updateToMinimumEqualOrIsAfterProgressIndex(ProgressIndex progressIndex) {
- return progressIndex == null ? this : progressIndex;
+ return progressIndex == null ? this : progressIndex.deepCopy();
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
index 5a567fd4981..3a743bcb5d5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
@@ -33,6 +33,8 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
@@ -47,15 +49,19 @@ public class RecoverProgressIndex extends ProgressIndex {
}
public RecoverProgressIndex(int dataNodeId, SimpleProgressIndex
simpleProgressIndex) {
- this.dataNodeId2LocalIndex = new HashMap<>();
+ this(ImmutableMap.of(dataNodeId, simpleProgressIndex));
+ }
- dataNodeId2LocalIndex.put(dataNodeId, simpleProgressIndex);
+ public RecoverProgressIndex(Map<Integer, SimpleProgressIndex>
dataNodeId2LocalIndex) {
+ this.dataNodeId2LocalIndex = new HashMap<>();
+ for (Entry<Integer, SimpleProgressIndex> entry :
dataNodeId2LocalIndex.entrySet()) {
+ this.dataNodeId2LocalIndex.put(
+ entry.getKey(), (SimpleProgressIndex) entry.getValue().deepCopy());
+ }
}
public Map<Integer, SimpleProgressIndex> getDataNodeId2LocalIndex() {
- return ImmutableMap.<Integer, SimpleProgressIndex>builder()
- .putAll(dataNodeId2LocalIndex)
- .build();
+ return ImmutableMap.copyOf(((RecoverProgressIndex)
deepCopy()).dataNodeId2LocalIndex);
}
@Override
@@ -162,7 +168,12 @@ public class RecoverProgressIndex extends ProgressIndex {
@Override
public int hashCode() {
- return 0;
+ return Objects.hash(dataNodeId2LocalIndex);
+ }
+
+ @Override
+ public ProgressIndex deepCopy() {
+ return new RecoverProgressIndex(dataNodeId2LocalIndex);
}
@Override
@@ -181,7 +192,7 @@ public class RecoverProgressIndex extends ProgressIndex {
thatK,
(thisK, thisV) ->
(thisV == null
- ? thatV
+ ? (SimpleProgressIndex) thatV.deepCopy()
: (SimpleProgressIndex)
thisV.updateToMinimumEqualOrIsAfterProgressIndex(thatV))));
return this;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
index 677585ce91c..267da7a8b50 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
@@ -30,6 +30,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class SimpleProgressIndex extends ProgressIndex {
@@ -136,7 +137,12 @@ public class SimpleProgressIndex extends ProgressIndex {
@Override
public int hashCode() {
- return 0;
+ return Objects.hash(rebootTimes, memtableFlushOrderId);
+ }
+
+ @Override
+ public ProgressIndex deepCopy() {
+ return new SimpleProgressIndex(rebootTimes, memtableFlushOrderId);
}
@Override
@@ -153,7 +159,7 @@ public class SimpleProgressIndex extends ProgressIndex {
return this;
}
if (thisSimpleProgressIndex.rebootTimes <
thatSimpleProgressIndex.rebootTimes) {
- return progressIndex;
+ return progressIndex.deepCopy();
}
// thisSimpleProgressIndex.rebootTimes ==
thatSimpleProgressIndex.rebootTimes
if (thisSimpleProgressIndex.memtableFlushOrderId
@@ -162,7 +168,7 @@ public class SimpleProgressIndex extends ProgressIndex {
}
if (thisSimpleProgressIndex.memtableFlushOrderId
< thatSimpleProgressIndex.memtableFlushOrderId) {
- return progressIndex;
+ return progressIndex.deepCopy();
}
// thisSimpleProgressIndex.memtableFlushOrderId ==
// thatSimpleProgressIndex.memtableFlushOrderId
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
index 8b44edfe07d..e66dae98d49 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.consensus.index.impl;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import com.google.common.collect.ImmutableMap;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -36,6 +37,12 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+/**
+ * NOTE: Currently, {@link StateProgressIndex} does not perform deep copies of
the {@link Binary}
+ * during construction or updates, which may lead to unintended shared state
or modifications. This
+ * behavior should be reviewed and adjusted as necessary to ensure the
integrity and independence of
+ * the progress index instances.
+ */
public class StateProgressIndex extends ProgressIndex {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -47,8 +54,8 @@ public class StateProgressIndex extends ProgressIndex {
public StateProgressIndex(
long version, Map<String, Binary> state, ProgressIndex
innerProgressIndex) {
this.version = version;
- this.state = state;
- this.innerProgressIndex = innerProgressIndex;
+ this.state = new HashMap<>(state);
+ this.innerProgressIndex = innerProgressIndex.deepCopy();
}
public long getVersion() {
@@ -56,11 +63,13 @@ public class StateProgressIndex extends ProgressIndex {
}
public ProgressIndex getInnerProgressIndex() {
- return innerProgressIndex == null ? MinimumProgressIndex.INSTANCE :
innerProgressIndex;
+ return innerProgressIndex == null
+ ? MinimumProgressIndex.INSTANCE
+ : innerProgressIndex.deepCopy();
}
public Map<String, Binary> getState() {
- return state;
+ return ImmutableMap.copyOf(state);
}
@Override
@@ -158,6 +167,11 @@ public class StateProgressIndex extends ProgressIndex {
return Objects.hash(innerProgressIndex, version);
}
+ @Override
+ public ProgressIndex deepCopy() {
+ return new StateProgressIndex(version, state, innerProgressIndex);
+ }
+
@Override
public ProgressIndex
updateToMinimumEqualOrIsAfterProgressIndex(ProgressIndex progressIndex) {
lock.writeLock().lock();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
index ea5469bdb25..4195f027681 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.consensus.index.impl;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import com.google.common.collect.ImmutableMap;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -37,6 +38,12 @@ import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
+/**
+ * NOTE: Currently, {@link TimeWindowStateProgressIndex} does not perform deep
copies of the {@link
+ * ByteBuffer} during construction or updates, which may lead to unintended
shared state or
+ * modifications. This behavior should be reviewed and adjusted as necessary
to ensure the integrity
+ * and independence of the progress index instances.
+ */
public class TimeWindowStateProgressIndex extends ProgressIndex {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -46,7 +53,8 @@ public class TimeWindowStateProgressIndex extends
ProgressIndex {
public TimeWindowStateProgressIndex(
@Nonnull Map<String, Pair<Long, ByteBuffer>>
timeSeries2TimestampWindowBufferPairMap) {
- this.timeSeries2TimestampWindowBufferPairMap =
timeSeries2TimestampWindowBufferPairMap;
+ this.timeSeries2TimestampWindowBufferPairMap =
+ new HashMap<>(timeSeries2TimestampWindowBufferPairMap);
}
private TimeWindowStateProgressIndex() {
@@ -54,7 +62,7 @@ public class TimeWindowStateProgressIndex extends
ProgressIndex {
}
public Map<String, Pair<Long, ByteBuffer>>
getTimeSeries2TimestampWindowBufferPairMap() {
- return timeSeries2TimestampWindowBufferPairMap;
+ return ImmutableMap.copyOf(timeSeries2TimestampWindowBufferPairMap);
}
public long getMinTime() {
@@ -186,6 +194,11 @@ public class TimeWindowStateProgressIndex extends
ProgressIndex {
return Objects.hash(timeSeries2TimestampWindowBufferPairMap);
}
+ @Override
+ public ProgressIndex deepCopy() {
+ return new
TimeWindowStateProgressIndex(timeSeries2TimestampWindowBufferPairMap);
+ }
+
@Override
public ProgressIndex
updateToMinimumEqualOrIsAfterProgressIndex(ProgressIndex progressIndex) {
lock.writeLock().lock();