This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch thread-safely-seriespartitiontable
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to 
refs/heads/thread-safely-seriespartitiontable by this push:
     new 92f1129fb43 finish
92f1129fb43 is described below

commit 92f1129fb4318fdb840298d549a2700d7e8dfb1f
Author: YongzaoDan <[email protected]>
AuthorDate: Thu Jun 6 15:57:32 2024 +0800

    finish
---
 .../manager/partition/PartitionManager.java        |   2 +-
 .../commons/partition/SeriesPartitionTable.java    | 330 +++++++++++++--------
 2 files changed, 203 insertions(+), 129 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 35d82a5454e..b16314ff5ad 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -466,7 +466,7 @@ public class PartitionManager {
       return getConsensusManager().write(plan);
     } catch (ConsensusException e) {
       // The allocation might fail due to consensus error
-      LOGGER.error("Write DataPartition allocation result failed because: {}", 
status);
+      LOGGER.error("Write partition allocation result failed because: {}", 
status);
       TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
       res.setMessage(e.getMessage());
       return res;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
index 890e46a451a..b722f564b19 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
@@ -42,26 +42,42 @@ import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 public class SeriesPartitionTable {
 
+  private final ReentrantReadWriteLock seriesPartitionMapLock;
   private final TreeMap<TTimePartitionSlot, List<TConsensusGroupId>> 
seriesPartitionMap;
 
   public SeriesPartitionTable() {
+    this.seriesPartitionMapLock = new ReentrantReadWriteLock();
     this.seriesPartitionMap = new TreeMap<>();
   }
 
   public SeriesPartitionTable(Map<TTimePartitionSlot, List<TConsensusGroupId>> 
seriesPartitionMap) {
+    this.seriesPartitionMapLock = new ReentrantReadWriteLock();
     this.seriesPartitionMap = new TreeMap<>(seriesPartitionMap);
   }
 
   public Map<TTimePartitionSlot, List<TConsensusGroupId>> 
getSeriesPartitionMap() {
-    return seriesPartitionMap;
+    seriesPartitionMapLock.readLock().lock();
+    try {
+      return seriesPartitionMap;
+    } finally {
+      seriesPartitionMapLock.readLock().unlock();
+    }
   }
 
   public void putDataPartition(TTimePartitionSlot timePartitionSlot, 
TConsensusGroupId groupId) {
-    seriesPartitionMap.computeIfAbsent(timePartitionSlot, empty -> new 
ArrayList<>()).add(groupId);
+    seriesPartitionMapLock.writeLock().lock();
+    try {
+      seriesPartitionMap
+          .computeIfAbsent(timePartitionSlot, empty -> new ArrayList<>())
+          .add(groupId);
+    } finally {
+      seriesPartitionMapLock.writeLock().unlock();
+    }
   }
 
   /**
@@ -74,52 +90,56 @@ public class SeriesPartitionTable {
   public boolean getDataPartition(
       TTimeSlotList partitionSlotList, SeriesPartitionTable 
seriesPartitionTable) {
     AtomicBoolean result = new AtomicBoolean(true);
+    seriesPartitionMapLock.readLock().lock();
     List<TTimePartitionSlot> partitionSlots = 
partitionSlotList.getTimePartitionSlots();
-
-    if (partitionSlots.isEmpty()) {
-      // Return all DataPartitions in one SeriesPartitionSlot
-      // when the queried TimePartitionSlots are empty
-      seriesPartitionTable.getSeriesPartitionMap().putAll(seriesPartitionMap);
-    } else {
-      boolean isNeedLeftAll = partitionSlotList.isNeedLeftAll(),
-          isNeedRightAll = partitionSlotList.isNeedRightAll();
-      if (isNeedLeftAll || isNeedRightAll) {
-        // we need to calculate the leftMargin which contains all the time 
partition on the unclosed
-        // left side: (-oo, leftMargin)
-        // and the rightMargin which contains all the time partition on the 
unclosed right side:
-        // (rightMargin, +oo)
-        // all the remaining closed time range which locates in [leftMargin, 
rightMargin] will be
-        // calculated outside if block
-        long leftMargin = isNeedLeftAll ? partitionSlots.get(0).getStartTime() 
: Long.MIN_VALUE,
-            rightMargin =
-                isNeedRightAll
-                    ? partitionSlots.get(partitionSlots.size() - 
1).getStartTime()
-                    : Long.MAX_VALUE;
-        seriesPartitionTable
-            .getSeriesPartitionMap()
-            .putAll(
-                seriesPartitionMap.entrySet().stream()
-                    .filter(
-                        entry -> {
-                          long startTime = entry.getKey().getStartTime();
-                          return startTime < leftMargin || startTime > 
rightMargin;
-                        })
-                    .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
+    try {
+      if (partitionSlots.isEmpty()) {
+        // Return all DataPartitions in one SeriesPartitionSlot
+        // when the queried TimePartitionSlots are empty
+        
seriesPartitionTable.getSeriesPartitionMap().putAll(seriesPartitionMap);
+      } else {
+        boolean isNeedLeftAll = partitionSlotList.isNeedLeftAll(),
+            isNeedRightAll = partitionSlotList.isNeedRightAll();
+        if (isNeedLeftAll || isNeedRightAll) {
+          // we need to calculate the leftMargin which contains all the time 
partition on the
+          // unclosed
+          // left side: (-oo, leftMargin)
+          // and the rightMargin which contains all the time partition on the 
unclosed right side:
+          // (rightMargin, +oo)
+          // all the remaining closed time range which locates in [leftMargin, 
rightMargin] will be
+          // calculated outside if block
+          long leftMargin = isNeedLeftAll ? 
partitionSlots.get(0).getStartTime() : Long.MIN_VALUE,
+              rightMargin =
+                  isNeedRightAll
+                      ? partitionSlots.get(partitionSlots.size() - 
1).getStartTime()
+                      : Long.MAX_VALUE;
+          seriesPartitionTable
+              .getSeriesPartitionMap()
+              .putAll(
+                  seriesPartitionMap.entrySet().stream()
+                      .filter(
+                          entry -> {
+                            long startTime = entry.getKey().getStartTime();
+                            return startTime < leftMargin || startTime > 
rightMargin;
+                          })
+                      .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
+        }
+
+        // Return the DataPartition for each match TimePartitionSlot
+        partitionSlots.forEach(
+            timePartitionSlot -> {
+              if (seriesPartitionMap.containsKey(timePartitionSlot)) {
+                seriesPartitionTable
+                    .getSeriesPartitionMap()
+                    .put(timePartitionSlot, 
seriesPartitionMap.get(timePartitionSlot));
+              } else {
+                result.set(false);
+              }
+            });
       }
-
-      // Return the DataPartition for each match TimePartitionSlot
-      partitionSlots.forEach(
-          timePartitionSlot -> {
-            if (seriesPartitionMap.containsKey(timePartitionSlot)) {
-              seriesPartitionTable
-                  .getSeriesPartitionMap()
-                  .put(timePartitionSlot, 
seriesPartitionMap.get(timePartitionSlot));
-            } else {
-              result.set(false);
-            }
-          });
+    } finally {
+      seriesPartitionMapLock.readLock().unlock();
     }
-
     return result.get();
   }
 
@@ -130,8 +150,13 @@ public class SeriesPartitionTable {
    * @return The specified DataPartition's successor if exists, null otherwise
    */
   public TConsensusGroupId getSuccessorDataPartition(TTimePartitionSlot 
timePartitionSlot) {
-    TTimePartitionSlot successorSlot = 
seriesPartitionMap.higherKey(timePartitionSlot);
-    return successorSlot == null ? null : 
seriesPartitionMap.get(successorSlot).get(0);
+    seriesPartitionMapLock.readLock().lock();
+    try {
+      TTimePartitionSlot successorSlot = 
seriesPartitionMap.higherKey(timePartitionSlot);
+      return successorSlot == null ? null : 
seriesPartitionMap.get(successorSlot).get(0);
+    } finally {
+      seriesPartitionMapLock.readLock().unlock();
+    }
   }
 
   /**
@@ -141,8 +166,13 @@ public class SeriesPartitionTable {
    * @return The specified DataPartition's predecessor if exists, null 
otherwise
    */
   public TConsensusGroupId getPredecessorDataPartition(TTimePartitionSlot 
timePartitionSlot) {
-    TTimePartitionSlot predecessorSlot = 
seriesPartitionMap.lowerKey(timePartitionSlot);
-    return predecessorSlot == null ? null : 
seriesPartitionMap.get(predecessorSlot).get(0);
+    seriesPartitionMapLock.readLock().lock();
+    try {
+      TTimePartitionSlot predecessorSlot = 
seriesPartitionMap.lowerKey(timePartitionSlot);
+      return predecessorSlot == null ? null : 
seriesPartitionMap.get(predecessorSlot).get(0);
+    } finally {
+      seriesPartitionMapLock.readLock().unlock();
+    }
   }
 
   /**
@@ -153,28 +183,38 @@ public class SeriesPartitionTable {
    * @return the timePartition's corresponding dataRegionIds. return the 
dataRegions which
    *     timeslotIds are in the time range [startTimeSlotId, endTimeSlotId].
    */
-  List<TConsensusGroupId> getRegionId(
+  public List<TConsensusGroupId> getRegionId(
       TTimePartitionSlot startTimeSlotId, TTimePartitionSlot endTimeSlotId) {
-    return seriesPartitionMap.entrySet().stream()
-        .filter(
-            entry ->
-                entry.getKey().getStartTime() >= startTimeSlotId.getStartTime()
-                    && entry.getKey().getStartTime() <= 
endTimeSlotId.getStartTime())
-        .flatMap(entry -> entry.getValue().stream())
-        .collect(Collectors.toList());
+    seriesPartitionMapLock.readLock().lock();
+    try {
+      return seriesPartitionMap.entrySet().stream()
+          .filter(
+              entry ->
+                  entry.getKey().getStartTime() >= 
startTimeSlotId.getStartTime()
+                      && entry.getKey().getStartTime() <= 
endTimeSlotId.getStartTime())
+          .flatMap(entry -> entry.getValue().stream())
+          .collect(Collectors.toList());
+    } finally {
+      seriesPartitionMapLock.readLock().unlock();
+    }
   }
 
-  List<TTimePartitionSlot> getTimeSlotList(
+  public List<TTimePartitionSlot> getTimeSlotList(
       TConsensusGroupId regionId, long startTime, long endTime) {
-    if (regionId.getId() == -1) {
-      return seriesPartitionMap.keySet().stream()
-          .filter(e -> e.getStartTime() >= startTime && e.getStartTime() < 
endTime)
-          .collect(Collectors.toList());
-    } else {
-      return seriesPartitionMap.keySet().stream()
-          .filter(e -> e.getStartTime() >= startTime && e.getStartTime() < 
endTime)
-          .filter(e -> seriesPartitionMap.get(e).contains(regionId))
-          .collect(Collectors.toList());
+    seriesPartitionMapLock.readLock().lock();
+    try {
+      if (regionId.getId() == -1) {
+        return seriesPartitionMap.keySet().stream()
+            .filter(e -> e.getStartTime() >= startTime && e.getStartTime() < 
endTime)
+            .collect(Collectors.toList());
+      } else {
+        return seriesPartitionMap.keySet().stream()
+            .filter(e -> e.getStartTime() >= startTime && e.getStartTime() < 
endTime)
+            .filter(e -> seriesPartitionMap.get(e).contains(regionId))
+            .collect(Collectors.toList());
+      }
+    } finally {
+      seriesPartitionMapLock.readLock().unlock();
     }
   }
 
@@ -190,18 +230,23 @@ public class SeriesPartitionTable {
       SeriesPartitionTable assignedSeriesPartitionTable,
       TSeriesPartitionSlot seriesPartitionSlot,
       Map<TConsensusGroupId, Map<TSeriesPartitionSlot, AtomicLong>> 
groupDeltaMap) {
-    assignedSeriesPartitionTable
-        .getSeriesPartitionMap()
-        .forEach(
-            ((timePartitionSlot, consensusGroupIds) -> {
-              seriesPartitionMap.put(timePartitionSlot, new 
Vector<>(consensusGroupIds));
-              consensusGroupIds.forEach(
-                  consensusGroupId ->
-                      groupDeltaMap
-                          .computeIfAbsent(consensusGroupId, empty -> new 
ConcurrentHashMap<>())
-                          .computeIfAbsent(seriesPartitionSlot, empty -> new 
AtomicLong(0))
-                          .getAndIncrement());
-            }));
+    seriesPartitionMapLock.writeLock().lock();
+    try {
+      assignedSeriesPartitionTable
+          .getSeriesPartitionMap()
+          .forEach(
+              ((timePartitionSlot, consensusGroupIds) -> {
+                seriesPartitionMap.put(timePartitionSlot, new 
Vector<>(consensusGroupIds));
+                consensusGroupIds.forEach(
+                    consensusGroupId ->
+                        groupDeltaMap
+                            .computeIfAbsent(consensusGroupId, empty -> new 
ConcurrentHashMap<>())
+                            .computeIfAbsent(seriesPartitionSlot, empty -> new 
AtomicLong(0))
+                            .getAndIncrement());
+              }));
+    } finally {
+      seriesPartitionMapLock.writeLock().unlock();
+    }
   }
 
   /**
@@ -213,15 +258,18 @@ public class SeriesPartitionTable {
    */
   public synchronized List<TTimePartitionSlot> 
filterUnassignedDataPartitionSlots(
       List<TTimePartitionSlot> partitionSlots) {
+    seriesPartitionMapLock.readLock().lock();
     List<TTimePartitionSlot> result = new Vector<>();
-
-    partitionSlots.forEach(
-        timePartitionSlot -> {
-          if (!seriesPartitionMap.containsKey(timePartitionSlot)) {
-            result.add(timePartitionSlot);
-          }
-        });
-
+    try {
+      partitionSlots.forEach(
+          timePartitionSlot -> {
+            if (!seriesPartitionMap.containsKey(timePartitionSlot)) {
+              result.add(timePartitionSlot);
+            }
+          });
+    } finally {
+      seriesPartitionMapLock.readLock().unlock();
+    }
     return result;
   }
 
@@ -231,74 +279,100 @@ public class SeriesPartitionTable {
    * @return The last DataPartition's ConsensusGroupId, null if there are no 
DataPartitions yet
    */
   public TConsensusGroupId getLastConsensusGroupId() {
-    Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry =
-        seriesPartitionMap.lastEntry();
-    if (lastEntry == null) {
-      return null;
+    seriesPartitionMapLock.readLock().lock();
+    try {
+      Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry =
+          seriesPartitionMap.lastEntry();
+      if (lastEntry == null) {
+        return null;
+      }
+      return lastEntry.getValue().get(lastEntry.getValue().size() - 1);
+    } finally {
+      seriesPartitionMapLock.readLock().unlock();
     }
-    return lastEntry.getValue().get(lastEntry.getValue().size() - 1);
   }
 
   public void serialize(OutputStream outputStream, TProtocol protocol)
       throws IOException, TException {
-    ReadWriteIOUtils.write(seriesPartitionMap.size(), outputStream);
-    for (Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> 
seriesPartitionEntry :
-        seriesPartitionMap.entrySet()) {
-      seriesPartitionEntry.getKey().write(protocol);
-      ReadWriteIOUtils.write(seriesPartitionEntry.getValue().size(), 
outputStream);
-      for (TConsensusGroupId consensusGroupId : 
seriesPartitionEntry.getValue()) {
-        consensusGroupId.write(protocol);
+    seriesPartitionMapLock.readLock().lock();
+    try {
+      ReadWriteIOUtils.write(seriesPartitionMap.size(), outputStream);
+      for (Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> 
seriesPartitionEntry :
+          seriesPartitionMap.entrySet()) {
+        seriesPartitionEntry.getKey().write(protocol);
+        ReadWriteIOUtils.write(seriesPartitionEntry.getValue().size(), 
outputStream);
+        for (TConsensusGroupId consensusGroupId : 
seriesPartitionEntry.getValue()) {
+          consensusGroupId.write(protocol);
+        }
       }
+    } finally {
+      seriesPartitionMapLock.readLock().unlock();
     }
   }
 
   /** Only for ConsensusRequest. */
   public void deserialize(ByteBuffer buffer) {
-    int timePartitionSlotNum = buffer.getInt();
-    for (int i = 0; i < timePartitionSlotNum; i++) {
-      TTimePartitionSlot timePartitionSlot =
-          ThriftCommonsSerDeUtils.deserializeTTimePartitionSlot(buffer);
-
-      int consensusGroupIdNum = buffer.getInt();
-      List<TConsensusGroupId> consensusGroupIds = new Vector<>();
-      for (int j = 0; j < consensusGroupIdNum; j++) {
-        
consensusGroupIds.add(ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer));
+    seriesPartitionMapLock.writeLock().lock();
+    try {
+      int timePartitionSlotNum = buffer.getInt();
+      for (int i = 0; i < timePartitionSlotNum; i++) {
+        TTimePartitionSlot timePartitionSlot =
+            ThriftCommonsSerDeUtils.deserializeTTimePartitionSlot(buffer);
+        int consensusGroupIdNum = buffer.getInt();
+        List<TConsensusGroupId> consensusGroupIds = new Vector<>();
+        for (int j = 0; j < consensusGroupIdNum; j++) {
+          
consensusGroupIds.add(ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer));
+        }
+        seriesPartitionMap.put(timePartitionSlot, consensusGroupIds);
       }
-
-      seriesPartitionMap.put(timePartitionSlot, consensusGroupIds);
+    } finally {
+      seriesPartitionMapLock.writeLock().unlock();
     }
   }
 
   /** Only for Snapshot. */
   public void deserialize(InputStream inputStream, TProtocol protocol)
       throws IOException, TException {
-    int timePartitionSlotNum = ReadWriteIOUtils.readInt(inputStream);
-    for (int i = 0; i < timePartitionSlotNum; i++) {
-      TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot();
-      timePartitionSlot.read(protocol);
-
-      int consensusGroupIdNum = ReadWriteIOUtils.readInt(inputStream);
-      List<TConsensusGroupId> consensusGroupIds = new Vector<>();
-      for (int j = 0; j < consensusGroupIdNum; j++) {
-        TConsensusGroupId consensusGroupId = new TConsensusGroupId();
-        consensusGroupId.read(protocol);
-        consensusGroupIds.add(consensusGroupId);
+    seriesPartitionMapLock.writeLock().lock();
+    try {
+      int timePartitionSlotNum = ReadWriteIOUtils.readInt(inputStream);
+      for (int i = 0; i < timePartitionSlotNum; i++) {
+        TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot();
+        timePartitionSlot.read(protocol);
+        int consensusGroupIdNum = ReadWriteIOUtils.readInt(inputStream);
+        List<TConsensusGroupId> consensusGroupIds = new Vector<>();
+        for (int j = 0; j < consensusGroupIdNum; j++) {
+          TConsensusGroupId consensusGroupId = new TConsensusGroupId();
+          consensusGroupId.read(protocol);
+          consensusGroupIds.add(consensusGroupId);
+        }
+        seriesPartitionMap.put(timePartitionSlot, consensusGroupIds);
       }
-
-      seriesPartitionMap.put(timePartitionSlot, consensusGroupIds);
+    } finally {
+      seriesPartitionMapLock.writeLock().unlock();
     }
   }
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-    SeriesPartitionTable that = (SeriesPartitionTable) o;
-    return seriesPartitionMap.equals(that.seriesPartitionMap);
+    seriesPartitionMapLock.readLock().lock();
+    try {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      SeriesPartitionTable that = (SeriesPartitionTable) o;
+      return seriesPartitionMap.equals(that.seriesPartitionMap);
+    } finally {
+      seriesPartitionMapLock.readLock().unlock();
+    }
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(seriesPartitionMap);
+    seriesPartitionMapLock.readLock().lock();
+    try {
+      return Objects.hash(seriesPartitionMap);
+    } finally {
+      seriesPartitionMapLock.readLock().unlock();
+    }
   }
 }

Reply via email to