This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new c223f62f3ce Pipe: perform deep copy for incoming progress index when 
constructing and updating progress index & fix hash code of progress index 
(#13441) (#13503)
c223f62f3ce is described below

commit c223f62f3ce1577aab2c0f1c4e79fee3bdad11eb
Author: V_Galaxy <[email protected]>
AuthorDate: Fri Sep 13 10:59:23 2024 +0800

    Pipe: perform deep copy for incoming progress index when constructing and 
updating progress index & fix hash code of progress index (#13441) (#13503)
---
 .../PipeHistoricalDataRegionTsFileExtractor.java   |  2 +-
 .../plan/node/write/InsertMultiTabletsNode.java    |  2 +-
 .../plan/planner/plan/node/write/InsertNode.java   |  2 +-
 .../planner/plan/node/write/InsertRowsNode.java    |  4 +--
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |  2 +-
 .../dataregion/tsfile/TsFileResource.java          |  4 +--
 .../TsFileResourceProgressIndexTest.java           |  5 ++++
 .../commons/consensus/index/ProgressIndex.java     | 34 ++++++++++++++++++----
 .../consensus/index/impl/HybridProgressIndex.java  | 27 ++++++++++++-----
 .../consensus/index/impl/IoTProgressIndex.java     | 16 ++++++++--
 .../consensus/index/impl/MetaProgressIndex.java    |  8 ++++-
 .../consensus/index/impl/MinimumProgressIndex.java |  7 ++++-
 .../consensus/index/impl/RecoverProgressIndex.java | 25 +++++++++++-----
 .../consensus/index/impl/SimpleProgressIndex.java  | 12 ++++++--
 .../consensus/index/impl/StateProgressIndex.java   | 22 +++++++++++---
 .../index/impl/TimeWindowStateProgressIndex.java   | 17 +++++++++--
 16 files changed, 146 insertions(+), 43 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index a0292d1b245..bfd57fe6e24 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -261,7 +261,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     pipeName = environment.getPipeName();
     creationTime = environment.getCreationTime();
     pipeTaskMeta = environment.getPipeTaskMeta();
-    startIndex = environment.getPipeTaskMeta().getProgressIndex();
+    startIndex = environment.getPipeTaskMeta().getProgressIndex().deepCopy();
 
     dataRegionId = environment.getRegionId();
     synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 430ab19c7c9..a5885604645 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -289,7 +289,7 @@ public class InsertMultiTabletsNode extends InsertNode {
 
   @Override
   public void setProgressIndex(ProgressIndex progressIndex) {
-    this.progressIndex = progressIndex;
+    this.progressIndex = progressIndex.deepCopy();
     insertTabletNodeList.forEach(node -> node.setProgressIndex(progressIndex));
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index 161b6ac9d2e..06581922e1b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -277,7 +277,7 @@ public abstract class InsertNode extends SearchNode 
implements ComparableConsens
 
   @Override
   public void setProgressIndex(ProgressIndex progressIndex) {
-    this.progressIndex = progressIndex;
+    this.progressIndex = progressIndex.deepCopy();
   }
 
   // endregion
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index b6f16b7d70a..2e86672aaa8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -276,7 +276,7 @@ public class InsertRowsNode extends InsertNode implements 
WALEntryValue {
 
   @Override
   public void setProgressIndex(ProgressIndex progressIndex) {
-    this.progressIndex = progressIndex;
+    this.progressIndex = progressIndex.deepCopy();
     insertRowNodeList.forEach(insertRowNode -> 
insertRowNode.setProgressIndex(progressIndex));
   }
 
@@ -287,7 +287,7 @@ public class InsertRowsNode extends InsertNode implements 
WALEntryValue {
 
     this.progressIndex =
         (this.progressIndex == null)
-            ? progressIndex
+            ? progressIndex.deepCopy()
             : 
this.progressIndex.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 0ec16ef69c7..86764946903 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -331,7 +331,7 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
 
   @Override
   public void setProgressIndex(ProgressIndex progressIndex) {
-    this.progressIndex = progressIndex;
+    this.progressIndex = progressIndex.deepCopy();
     insertRowNodeList.forEach(insertRowNode -> 
insertRowNode.setProgressIndex(progressIndex));
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index b7973cc03df..10033eb7381 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -1135,7 +1135,7 @@ public class TsFileResource {
 
     maxProgressIndex =
         (maxProgressIndex == null
-            ? progressIndex
+            ? progressIndex.deepCopy()
             : 
maxProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex));
   }
 
@@ -1144,7 +1144,7 @@ public class TsFileResource {
       return;
     }
 
-    maxProgressIndex = progressIndex;
+    maxProgressIndex = progressIndex.deepCopy();
   }
 
   public ProgressIndex getMaxProgressIndexAfterClose() throws 
IllegalStateException {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
index fa45296bdb6..3c5fd714d8e 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
@@ -188,6 +188,11 @@ public class TsFileResourceProgressIndexTest {
       return this.type == that.type && this.val == that.val;
     }
 
+    @Override
+    public ProgressIndex deepCopy() {
+      return new MockProgressIndex(type, val);
+    }
+
     @Override
     public ProgressIndex 
updateToMinimumEqualOrIsAfterProgressIndex(ProgressIndex progressIndex) {
       if (!(progressIndex instanceof MockProgressIndex)) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
index c876af8f09f..3160b1c23a1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
@@ -95,6 +95,22 @@ public abstract class ProgressIndex {
     return super.hashCode();
   }
 
+  /**
+   * Creates and returns a deep copy of this {@link ProgressIndex} instance.
+   *
+   * <p>This method performs a deep copy, meaning all nested objects and 
fields within this {@link
+   * ProgressIndex} are recursively copied, resulting in a new instance that 
is independent of the
+   * original. Modifications to the copied instance will not affect the 
original instance and vice
+   * versa.
+   *
+   * <p>When constructing or updating another {@link ProgressIndex} using an 
existing {@link
+   * ProgressIndex}, it is recommended to perform a deep copy of the existing 
instance to avoid
+   * unintended modifications or shared state between the instances.
+   *
+   * @return a new {@link ProgressIndex} instance that is a deep copy of this 
progress index
+   */
+  public abstract ProgressIndex deepCopy();
+
   /**
    * Define the isEqualOrAfter relation, A.isEqualOrAfter(B) if and only if 
each tuple member in A
    * is greater than or equal to B in the corresponding total order relation.
@@ -112,11 +128,13 @@ public abstract class ProgressIndex {
    * 
A.updateToMinimumIsAfterProgressIndex(B).equals(B.updateToMinimumIsAfterProgressIndex(A))
 is
    * {@code true}
    *
-   * <p>Note: this function may modify the caller.
+   * <p>Note: this function may modify the caller (this) but will not modify 
{@param progressIndex}.
    *
    * @param progressIndex the {@link ProgressIndex} to be compared
    * @return the minimum {@link ProgressIndex} after the given {@link 
ProgressIndex} and this {@link
-   *     ProgressIndex}
+   *     ProgressIndex}, the returned {@link ProgressIndex} will contain deep 
copies of all
+   *     references to the given {@param progressIndex}, ensuring no shared 
state between the
+   *     original and the result
    */
   public abstract ProgressIndex updateToMinimumEqualOrIsAfterProgressIndex(
       ProgressIndex progressIndex);
@@ -148,6 +166,8 @@ public abstract class ProgressIndex {
    *
    * <p>There is no R, such that R satisfies the above conditions and 
result.isAfter(R) is true
    *
+   * <p>Note: this function will not modify {@param progressIndex1} and 
{@param progressIndex2}.
+   *
    * @param progressIndex1 the first {@link ProgressIndex}. if it is {@code 
null}, the result {@link
    *     ProgressIndex} should be the second {@link ProgressIndex}. if it is a 
{@link
    *     MinimumProgressIndex}, the result {@link ProgressIndex} should be the 
second {@link
@@ -163,7 +183,9 @@ public abstract class ProgressIndex {
    *     should be the minimum {@link ProgressIndex} equal or after the first 
{@link ProgressIndex}
    *     and the second {@link ProgressIndex}
    * @return the minimum {@link ProgressIndex} after the first {@link 
ProgressIndex} and the second
-   *     {@link ProgressIndex}
+   *     {@link ProgressIndex}, the returned {@link ProgressIndex} will 
contain deep copies of all
+   *     references to {@param progressIndex1} and {@param progressIndex2}, 
ensuring that the result
+   *     is independent and modifications to it do not affect the original 
instances
    */
   protected static ProgressIndex blendProgressIndex(
       ProgressIndex progressIndex1, ProgressIndex progressIndex2) {
@@ -171,14 +193,14 @@ public abstract class ProgressIndex {
       return MinimumProgressIndex.INSTANCE;
     }
     if (progressIndex1 == null || progressIndex1 instanceof 
MinimumProgressIndex) {
-      return progressIndex2 == null ? MinimumProgressIndex.INSTANCE : 
progressIndex2;
+      return progressIndex2 == null ? MinimumProgressIndex.INSTANCE : 
progressIndex2.deepCopy();
     }
     if (progressIndex2 == null || progressIndex2 instanceof 
MinimumProgressIndex) {
-      return progressIndex1; // progressIndex1 is not null
+      return progressIndex1.deepCopy(); // progressIndex1 is not null
     }
 
     return progressIndex1 instanceof StateProgressIndex
-        ? 
progressIndex1.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex2)
+        ? 
progressIndex1.deepCopy().updateToMinimumEqualOrIsAfterProgressIndex(progressIndex2)
         : new HybridProgressIndex(progressIndex1)
             .updateToMinimumEqualOrIsAfterProgressIndex(progressIndex2);
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
index 70727c9dcf6..efa566a9c8d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.consensus.index.impl;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import javax.annotation.Nonnull;
@@ -32,6 +33,8 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
@@ -47,16 +50,19 @@ public class HybridProgressIndex extends ProgressIndex {
 
   public HybridProgressIndex(ProgressIndex progressIndex) {
     short type = progressIndex.getType().getType();
+    this.type2Index = new HashMap<>();
     if (ProgressIndexType.HYBRID_PROGRESS_INDEX.getType() != type) {
-      this.type2Index = new HashMap<>();
-      type2Index.put(type, progressIndex);
+      this.type2Index.put(type, progressIndex.deepCopy());
     } else {
-      this.type2Index = ((HybridProgressIndex) progressIndex).type2Index;
+      for (Entry<Short, ProgressIndex> entry :
+          ((HybridProgressIndex) progressIndex).type2Index.entrySet()) {
+        this.type2Index.put(entry.getKey(), entry.getValue().deepCopy());
+      }
     }
   }
 
   public Map<Short, ProgressIndex> getType2Index() {
-    return type2Index;
+    return ImmutableMap.copyOf(((HybridProgressIndex) deepCopy()).type2Index);
   }
 
   @Override
@@ -166,7 +172,12 @@ public class HybridProgressIndex extends ProgressIndex {
 
   @Override
   public int hashCode() {
-    return 0;
+    return Objects.hash(type2Index);
+  }
+
+  @Override
+  public ProgressIndex deepCopy() {
+    return new HybridProgressIndex(this);
   }
 
   @Override
@@ -178,7 +189,7 @@ public class HybridProgressIndex extends ProgressIndex {
       }
 
       if (progressIndex instanceof StateProgressIndex) {
-        return progressIndex.updateToMinimumEqualOrIsAfterProgressIndex(this);
+        return 
progressIndex.deepCopy().updateToMinimumEqualOrIsAfterProgressIndex(this);
       }
 
       if (!(progressIndex instanceof HybridProgressIndex)) {
@@ -186,7 +197,7 @@ public class HybridProgressIndex extends ProgressIndex {
             progressIndex.getType().getType(),
             (thisK, thisV) ->
                 (thisV == null
-                    ? progressIndex
+                    ? progressIndex.deepCopy()
                     : 
thisV.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex)));
         return this;
       }
@@ -199,7 +210,7 @@ public class HybridProgressIndex extends ProgressIndex {
                   thatK,
                   (thisK, thisV) ->
                       (thisV == null
-                          ? thatV
+                          ? thatV.deepCopy()
                           : 
thisV.updateToMinimumEqualOrIsAfterProgressIndex(thatV))));
       return this;
     } finally {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
index 1d7c31bbc1e..d1fa57c9406 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.consensus.index.impl;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import javax.annotation.Nonnull;
@@ -32,6 +33,7 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class IoTProgressIndex extends ProgressIndex {
@@ -45,8 +47,11 @@ public class IoTProgressIndex extends ProgressIndex {
   }
 
   public IoTProgressIndex(Integer peerId, Long searchIndex) {
-    peerId2SearchIndex = new HashMap<>();
-    peerId2SearchIndex.put(peerId, searchIndex);
+    this(ImmutableMap.of(peerId, searchIndex));
+  }
+
+  public IoTProgressIndex(Map<Integer, Long> peerId2SearchIndex) {
+    this.peerId2SearchIndex = new HashMap<>(peerId2SearchIndex);
   }
 
   @Override
@@ -151,7 +156,12 @@ public class IoTProgressIndex extends ProgressIndex {
 
   @Override
   public int hashCode() {
-    return 0;
+    return Objects.hash(peerId2SearchIndex);
+  }
+
+  @Override
+  public ProgressIndex deepCopy() {
+    return new IoTProgressIndex(peerId2SearchIndex);
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
index 679eecbf0eb..5349f83d294 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.Objects;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class MetaProgressIndex extends ProgressIndex {
@@ -130,7 +131,12 @@ public class MetaProgressIndex extends ProgressIndex {
 
   @Override
   public int hashCode() {
-    return 0;
+    return Objects.hash(index);
+  }
+
+  @Override
+  public ProgressIndex deepCopy() {
+    return new MetaProgressIndex(index);
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
index e476409a30b..cd5b254f74f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
@@ -72,9 +72,14 @@ public class MinimumProgressIndex extends ProgressIndex {
     return 0;
   }
 
+  @Override
+  public ProgressIndex deepCopy() {
+    return INSTANCE;
+  }
+
   @Override
   public ProgressIndex 
updateToMinimumEqualOrIsAfterProgressIndex(ProgressIndex progressIndex) {
-    return progressIndex == null ? this : progressIndex;
+    return progressIndex == null ? this : progressIndex.deepCopy();
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
index 5a567fd4981..3a743bcb5d5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
@@ -33,6 +33,8 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
@@ -47,15 +49,19 @@ public class RecoverProgressIndex extends ProgressIndex {
   }
 
   public RecoverProgressIndex(int dataNodeId, SimpleProgressIndex 
simpleProgressIndex) {
-    this.dataNodeId2LocalIndex = new HashMap<>();
+    this(ImmutableMap.of(dataNodeId, simpleProgressIndex));
+  }
 
-    dataNodeId2LocalIndex.put(dataNodeId, simpleProgressIndex);
+  public RecoverProgressIndex(Map<Integer, SimpleProgressIndex> 
dataNodeId2LocalIndex) {
+    this.dataNodeId2LocalIndex = new HashMap<>();
+    for (Entry<Integer, SimpleProgressIndex> entry : 
dataNodeId2LocalIndex.entrySet()) {
+      this.dataNodeId2LocalIndex.put(
+          entry.getKey(), (SimpleProgressIndex) entry.getValue().deepCopy());
+    }
   }
 
   public Map<Integer, SimpleProgressIndex> getDataNodeId2LocalIndex() {
-    return ImmutableMap.<Integer, SimpleProgressIndex>builder()
-        .putAll(dataNodeId2LocalIndex)
-        .build();
+    return ImmutableMap.copyOf(((RecoverProgressIndex) 
deepCopy()).dataNodeId2LocalIndex);
   }
 
   @Override
@@ -162,7 +168,12 @@ public class RecoverProgressIndex extends ProgressIndex {
 
   @Override
   public int hashCode() {
-    return 0;
+    return Objects.hash(dataNodeId2LocalIndex);
+  }
+
+  @Override
+  public ProgressIndex deepCopy() {
+    return new RecoverProgressIndex(dataNodeId2LocalIndex);
   }
 
   @Override
@@ -181,7 +192,7 @@ public class RecoverProgressIndex extends ProgressIndex {
                   thatK,
                   (thisK, thisV) ->
                       (thisV == null
-                          ? thatV
+                          ? (SimpleProgressIndex) thatV.deepCopy()
                           : (SimpleProgressIndex)
                               
thisV.updateToMinimumEqualOrIsAfterProgressIndex(thatV))));
       return this;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
index 677585ce91c..267da7a8b50 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.Objects;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class SimpleProgressIndex extends ProgressIndex {
@@ -136,7 +137,12 @@ public class SimpleProgressIndex extends ProgressIndex {
 
   @Override
   public int hashCode() {
-    return 0;
+    return Objects.hash(rebootTimes, memtableFlushOrderId);
+  }
+
+  @Override
+  public ProgressIndex deepCopy() {
+    return new SimpleProgressIndex(rebootTimes, memtableFlushOrderId);
   }
 
   @Override
@@ -153,7 +159,7 @@ public class SimpleProgressIndex extends ProgressIndex {
         return this;
       }
       if (thisSimpleProgressIndex.rebootTimes < 
thatSimpleProgressIndex.rebootTimes) {
-        return progressIndex;
+        return progressIndex.deepCopy();
       }
       // thisSimpleProgressIndex.rebootTimes == 
thatSimpleProgressIndex.rebootTimes
       if (thisSimpleProgressIndex.memtableFlushOrderId
@@ -162,7 +168,7 @@ public class SimpleProgressIndex extends ProgressIndex {
       }
       if (thisSimpleProgressIndex.memtableFlushOrderId
           < thatSimpleProgressIndex.memtableFlushOrderId) {
-        return progressIndex;
+        return progressIndex.deepCopy();
       }
       // thisSimpleProgressIndex.memtableFlushOrderId ==
       // thatSimpleProgressIndex.memtableFlushOrderId
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
index 8b44edfe07d..e66dae98d49 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.consensus.index.impl;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
@@ -36,6 +37,12 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+/**
+ * NOTE: Currently, {@link StateProgressIndex} does not perform deep copies of 
the {@link Binary}
+ * during construction or updates, which may lead to unintended shared state 
or modifications. This
+ * behavior should be reviewed and adjusted as necessary to ensure the 
integrity and independence of
+ * the progress index instances.
+ */
 public class StateProgressIndex extends ProgressIndex {
 
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -47,8 +54,8 @@ public class StateProgressIndex extends ProgressIndex {
   public StateProgressIndex(
       long version, Map<String, Binary> state, ProgressIndex 
innerProgressIndex) {
     this.version = version;
-    this.state = state;
-    this.innerProgressIndex = innerProgressIndex;
+    this.state = new HashMap<>(state);
+    this.innerProgressIndex = innerProgressIndex.deepCopy();
   }
 
   public long getVersion() {
@@ -56,11 +63,13 @@ public class StateProgressIndex extends ProgressIndex {
   }
 
   public ProgressIndex getInnerProgressIndex() {
-    return innerProgressIndex == null ? MinimumProgressIndex.INSTANCE : 
innerProgressIndex;
+    return innerProgressIndex == null
+        ? MinimumProgressIndex.INSTANCE
+        : innerProgressIndex.deepCopy();
   }
 
   public Map<String, Binary> getState() {
-    return state;
+    return ImmutableMap.copyOf(state);
   }
 
   @Override
@@ -158,6 +167,11 @@ public class StateProgressIndex extends ProgressIndex {
     return Objects.hash(innerProgressIndex, version);
   }
 
+  @Override
+  public ProgressIndex deepCopy() {
+    return new StateProgressIndex(version, state, innerProgressIndex);
+  }
+
   @Override
   public ProgressIndex 
updateToMinimumEqualOrIsAfterProgressIndex(ProgressIndex progressIndex) {
     lock.writeLock().lock();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
index ea5469bdb25..4195f027681 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.consensus.index.impl;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
@@ -37,6 +38,12 @@ import java.util.Objects;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
+/**
+ * NOTE: Currently, {@link TimeWindowStateProgressIndex} does not perform deep 
copies of the {@link
+ * ByteBuffer} during construction or updates, which may lead to unintended 
shared state or
+ * modifications. This behavior should be reviewed and adjusted as necessary 
to ensure the integrity
+ * and independence of the progress index instances.
+ */
 public class TimeWindowStateProgressIndex extends ProgressIndex {
 
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -46,7 +53,8 @@ public class TimeWindowStateProgressIndex extends 
ProgressIndex {
 
   public TimeWindowStateProgressIndex(
       @Nonnull Map<String, Pair<Long, ByteBuffer>> 
timeSeries2TimestampWindowBufferPairMap) {
-    this.timeSeries2TimestampWindowBufferPairMap = 
timeSeries2TimestampWindowBufferPairMap;
+    this.timeSeries2TimestampWindowBufferPairMap =
+        new HashMap<>(timeSeries2TimestampWindowBufferPairMap);
   }
 
   private TimeWindowStateProgressIndex() {
@@ -54,7 +62,7 @@ public class TimeWindowStateProgressIndex extends 
ProgressIndex {
   }
 
   public Map<String, Pair<Long, ByteBuffer>> 
getTimeSeries2TimestampWindowBufferPairMap() {
-    return timeSeries2TimestampWindowBufferPairMap;
+    return ImmutableMap.copyOf(timeSeries2TimestampWindowBufferPairMap);
   }
 
   public long getMinTime() {
@@ -186,6 +194,11 @@ public class TimeWindowStateProgressIndex extends 
ProgressIndex {
     return Objects.hash(timeSeries2TimestampWindowBufferPairMap);
   }
 
+  @Override
+  public ProgressIndex deepCopy() {
+    return new 
TimeWindowStateProgressIndex(timeSeries2TimestampWindowBufferPairMap);
+  }
+
   @Override
   public ProgressIndex 
updateToMinimumEqualOrIsAfterProgressIndex(ProgressIndex progressIndex) {
     lock.writeLock().lock();

Reply via email to