This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a2bec2bde2 Thread safely SeriesPartitionTable (#12679)
6a2bec2bde2 is described below

commit 6a2bec2bde2dae1c53f124c19a5b34bd4611b06c
Author: Yongzao <[email protected]>
AuthorDate: Fri Jun 7 18:14:20 2024 +0800

    Thread safely SeriesPartitionTable (#12679)
    
    * finish
    
    * use concurrentskiplist
    
    * bug fix
    
    ---------
    
    Co-authored-by: OneSizeFitQuorum <[email protected]>
---
 .mvn/develocity.xml                                |  4 +---
 .../manager/partition/PartitionManager.java        |  2 +-
 .../commons/partition/SeriesPartitionTable.java    | 28 ++++++++--------------
 3 files changed, 12 insertions(+), 22 deletions(-)

diff --git a/.mvn/develocity.xml b/.mvn/develocity.xml
index 1e82305ef93..b505d1a3666 100644
--- a/.mvn/develocity.xml
+++ b/.mvn/develocity.xml
@@ -26,9 +26,7 @@
     <buildScan>
         
<backgroundBuildScanUpload>#{isFalse(env['GITHUB_ACTIONS'])}</backgroundBuildScanUpload>
         <publishing>
-            <onlyIf>
-                <![CDATA[authenticated]]>
-            </onlyIf>
+            <onlyIf><![CDATA[authenticated]]></onlyIf>
         </publishing>
         <obfuscation>
             <ipAddresses>#{{'0.0.0.0'}}</ipAddresses>
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 35d82a5454e..b16314ff5ad 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -466,7 +466,7 @@ public class PartitionManager {
       return getConsensusManager().write(plan);
     } catch (ConsensusException e) {
       // The allocation might fail due to consensus error
-      LOGGER.error("Write DataPartition allocation result failed because: {}", 
status);
+      LOGGER.error("Write partition allocation result failed because: {}", 
status);
       TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
       res.setMessage(e.getMessage());
       return res;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
index 890e46a451a..494da226e83 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
@@ -33,27 +33,27 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.TreeMap;
 import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 public class SeriesPartitionTable {
 
-  private final TreeMap<TTimePartitionSlot, List<TConsensusGroupId>> 
seriesPartitionMap;
+  private final ConcurrentSkipListMap<TTimePartitionSlot, 
List<TConsensusGroupId>>
+      seriesPartitionMap;
 
   public SeriesPartitionTable() {
-    this.seriesPartitionMap = new TreeMap<>();
+    this.seriesPartitionMap = new ConcurrentSkipListMap<>();
   }
 
   public SeriesPartitionTable(Map<TTimePartitionSlot, List<TConsensusGroupId>> 
seriesPartitionMap) {
-    this.seriesPartitionMap = new TreeMap<>(seriesPartitionMap);
+    this.seriesPartitionMap = new ConcurrentSkipListMap<>(seriesPartitionMap);
   }
 
   public Map<TTimePartitionSlot, List<TConsensusGroupId>> 
getSeriesPartitionMap() {
@@ -61,7 +61,7 @@ public class SeriesPartitionTable {
   }
 
   public void putDataPartition(TTimePartitionSlot timePartitionSlot, 
TConsensusGroupId groupId) {
-    seriesPartitionMap.computeIfAbsent(timePartitionSlot, empty -> new 
ArrayList<>()).add(groupId);
+    seriesPartitionMap.computeIfAbsent(timePartitionSlot, empty -> new 
Vector<>()).add(groupId);
   }
 
   /**
@@ -75,7 +75,6 @@ public class SeriesPartitionTable {
       TTimeSlotList partitionSlotList, SeriesPartitionTable 
seriesPartitionTable) {
     AtomicBoolean result = new AtomicBoolean(true);
     List<TTimePartitionSlot> partitionSlots = 
partitionSlotList.getTimePartitionSlots();
-
     if (partitionSlots.isEmpty()) {
       // Return all DataPartitions in one SeriesPartitionSlot
       // when the queried TimePartitionSlots are empty
@@ -84,7 +83,8 @@ public class SeriesPartitionTable {
       boolean isNeedLeftAll = partitionSlotList.isNeedLeftAll(),
           isNeedRightAll = partitionSlotList.isNeedRightAll();
       if (isNeedLeftAll || isNeedRightAll) {
-        // we need to calculate the leftMargin which contains all the time 
partition on the unclosed
+        // we need to calculate the leftMargin which contains all the time 
partition on the
+        // unclosed
         // left side: (-oo, leftMargin)
         // and the rightMargin which contains all the time partition on the 
unclosed right side:
         // (rightMargin, +oo)
@@ -106,7 +106,6 @@ public class SeriesPartitionTable {
                         })
                     .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
       }
-
       // Return the DataPartition for each match TimePartitionSlot
       partitionSlots.forEach(
           timePartitionSlot -> {
@@ -119,7 +118,6 @@ public class SeriesPartitionTable {
             }
           });
     }
-
     return result.get();
   }
 
@@ -153,7 +151,7 @@ public class SeriesPartitionTable {
    * @return the timePartition's corresponding dataRegionIds. return the 
dataRegions which
    *     timeslotIds are in the time range [startTimeSlotId, endTimeSlotId].
    */
-  List<TConsensusGroupId> getRegionId(
+  public List<TConsensusGroupId> getRegionId(
       TTimePartitionSlot startTimeSlotId, TTimePartitionSlot endTimeSlotId) {
     return seriesPartitionMap.entrySet().stream()
         .filter(
@@ -164,7 +162,7 @@ public class SeriesPartitionTable {
         .collect(Collectors.toList());
   }
 
-  List<TTimePartitionSlot> getTimeSlotList(
+  public List<TTimePartitionSlot> getTimeSlotList(
       TConsensusGroupId regionId, long startTime, long endTime) {
     if (regionId.getId() == -1) {
       return seriesPartitionMap.keySet().stream()
@@ -214,14 +212,12 @@ public class SeriesPartitionTable {
   public synchronized List<TTimePartitionSlot> 
filterUnassignedDataPartitionSlots(
       List<TTimePartitionSlot> partitionSlots) {
     List<TTimePartitionSlot> result = new Vector<>();
-
     partitionSlots.forEach(
         timePartitionSlot -> {
           if (!seriesPartitionMap.containsKey(timePartitionSlot)) {
             result.add(timePartitionSlot);
           }
         });
-
     return result;
   }
 
@@ -258,13 +254,11 @@ public class SeriesPartitionTable {
     for (int i = 0; i < timePartitionSlotNum; i++) {
       TTimePartitionSlot timePartitionSlot =
           ThriftCommonsSerDeUtils.deserializeTTimePartitionSlot(buffer);
-
       int consensusGroupIdNum = buffer.getInt();
       List<TConsensusGroupId> consensusGroupIds = new Vector<>();
       for (int j = 0; j < consensusGroupIdNum; j++) {
         
consensusGroupIds.add(ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer));
       }
-
       seriesPartitionMap.put(timePartitionSlot, consensusGroupIds);
     }
   }
@@ -276,7 +270,6 @@ public class SeriesPartitionTable {
     for (int i = 0; i < timePartitionSlotNum; i++) {
       TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot();
       timePartitionSlot.read(protocol);
-
       int consensusGroupIdNum = ReadWriteIOUtils.readInt(inputStream);
       List<TConsensusGroupId> consensusGroupIds = new Vector<>();
       for (int j = 0; j < consensusGroupIdNum; j++) {
@@ -284,7 +277,6 @@ public class SeriesPartitionTable {
         consensusGroupId.read(protocol);
         consensusGroupIds.add(consensusGroupId);
       }
-
       seriesPartitionMap.put(timePartitionSlot, consensusGroupIds);
     }
   }

Reply via email to