This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6a2bec2bde2 Thread safely SeriesPartitionTable (#12679)
6a2bec2bde2 is described below
commit 6a2bec2bde2dae1c53f124c19a5b34bd4611b06c
Author: Yongzao <[email protected]>
AuthorDate: Fri Jun 7 18:14:20 2024 +0800
Thread safely SeriesPartitionTable (#12679)
* finish
* use concurrentskiplist
* bug fix
---------
Co-authored-by: OneSizeFitQuorum <[email protected]>
---
.mvn/develocity.xml | 4 +---
.../manager/partition/PartitionManager.java | 2 +-
.../commons/partition/SeriesPartitionTable.java | 28 ++++++++--------------
3 files changed, 12 insertions(+), 22 deletions(-)
diff --git a/.mvn/develocity.xml b/.mvn/develocity.xml
index 1e82305ef93..b505d1a3666 100644
--- a/.mvn/develocity.xml
+++ b/.mvn/develocity.xml
@@ -26,9 +26,7 @@
<buildScan>
<backgroundBuildScanUpload>#{isFalse(env['GITHUB_ACTIONS'])}</backgroundBuildScanUpload>
<publishing>
- <onlyIf>
- <![CDATA[authenticated]]>
- </onlyIf>
+ <onlyIf><![CDATA[authenticated]]></onlyIf>
</publishing>
<obfuscation>
<ipAddresses>#{{'0.0.0.0'}}</ipAddresses>
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 35d82a5454e..b16314ff5ad 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -466,7 +466,7 @@ public class PartitionManager {
return getConsensusManager().write(plan);
} catch (ConsensusException e) {
// The allocation might fail due to consensus error
- LOGGER.error("Write DataPartition allocation result failed because: {}",
status);
+ LOGGER.error("Write partition allocation result failed because: {}",
status);
TSStatus res = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
return res;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
index 890e46a451a..494da226e83 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
@@ -33,27 +33,27 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.TreeMap;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
public class SeriesPartitionTable {
- private final TreeMap<TTimePartitionSlot, List<TConsensusGroupId>>
seriesPartitionMap;
+ private final ConcurrentSkipListMap<TTimePartitionSlot,
List<TConsensusGroupId>>
+ seriesPartitionMap;
public SeriesPartitionTable() {
- this.seriesPartitionMap = new TreeMap<>();
+ this.seriesPartitionMap = new ConcurrentSkipListMap<>();
}
public SeriesPartitionTable(Map<TTimePartitionSlot, List<TConsensusGroupId>>
seriesPartitionMap) {
- this.seriesPartitionMap = new TreeMap<>(seriesPartitionMap);
+ this.seriesPartitionMap = new ConcurrentSkipListMap<>(seriesPartitionMap);
}
public Map<TTimePartitionSlot, List<TConsensusGroupId>>
getSeriesPartitionMap() {
@@ -61,7 +61,7 @@ public class SeriesPartitionTable {
}
public void putDataPartition(TTimePartitionSlot timePartitionSlot,
TConsensusGroupId groupId) {
- seriesPartitionMap.computeIfAbsent(timePartitionSlot, empty -> new
ArrayList<>()).add(groupId);
+ seriesPartitionMap.computeIfAbsent(timePartitionSlot, empty -> new
Vector<>()).add(groupId);
}
/**
@@ -75,7 +75,6 @@ public class SeriesPartitionTable {
TTimeSlotList partitionSlotList, SeriesPartitionTable
seriesPartitionTable) {
AtomicBoolean result = new AtomicBoolean(true);
List<TTimePartitionSlot> partitionSlots =
partitionSlotList.getTimePartitionSlots();
-
if (partitionSlots.isEmpty()) {
// Return all DataPartitions in one SeriesPartitionSlot
// when the queried TimePartitionSlots are empty
@@ -84,7 +83,8 @@ public class SeriesPartitionTable {
boolean isNeedLeftAll = partitionSlotList.isNeedLeftAll(),
isNeedRightAll = partitionSlotList.isNeedRightAll();
if (isNeedLeftAll || isNeedRightAll) {
- // we need to calculate the leftMargin which contains all the time
partition on the unclosed
+ // we need to calculate the leftMargin which contains all the time
partition on the
+ // unclosed
// left side: (-oo, leftMargin)
// and the rightMargin which contains all the time partition on the
unclosed right side:
// (rightMargin, +oo)
@@ -106,7 +106,6 @@ public class SeriesPartitionTable {
})
.collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue)));
}
-
// Return the DataPartition for each match TimePartitionSlot
partitionSlots.forEach(
timePartitionSlot -> {
@@ -119,7 +118,6 @@ public class SeriesPartitionTable {
}
});
}
-
return result.get();
}
@@ -153,7 +151,7 @@ public class SeriesPartitionTable {
* @return the timePartition's corresponding dataRegionIds. return the
dataRegions which
* timeslotIds are in the time range [startTimeSlotId, endTimeSlotId].
*/
- List<TConsensusGroupId> getRegionId(
+ public List<TConsensusGroupId> getRegionId(
TTimePartitionSlot startTimeSlotId, TTimePartitionSlot endTimeSlotId) {
return seriesPartitionMap.entrySet().stream()
.filter(
@@ -164,7 +162,7 @@ public class SeriesPartitionTable {
.collect(Collectors.toList());
}
- List<TTimePartitionSlot> getTimeSlotList(
+ public List<TTimePartitionSlot> getTimeSlotList(
TConsensusGroupId regionId, long startTime, long endTime) {
if (regionId.getId() == -1) {
return seriesPartitionMap.keySet().stream()
@@ -214,14 +212,12 @@ public class SeriesPartitionTable {
public synchronized List<TTimePartitionSlot>
filterUnassignedDataPartitionSlots(
List<TTimePartitionSlot> partitionSlots) {
List<TTimePartitionSlot> result = new Vector<>();
-
partitionSlots.forEach(
timePartitionSlot -> {
if (!seriesPartitionMap.containsKey(timePartitionSlot)) {
result.add(timePartitionSlot);
}
});
-
return result;
}
@@ -258,13 +254,11 @@ public class SeriesPartitionTable {
for (int i = 0; i < timePartitionSlotNum; i++) {
TTimePartitionSlot timePartitionSlot =
ThriftCommonsSerDeUtils.deserializeTTimePartitionSlot(buffer);
-
int consensusGroupIdNum = buffer.getInt();
List<TConsensusGroupId> consensusGroupIds = new Vector<>();
for (int j = 0; j < consensusGroupIdNum; j++) {
consensusGroupIds.add(ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer));
}
-
seriesPartitionMap.put(timePartitionSlot, consensusGroupIds);
}
}
@@ -276,7 +270,6 @@ public class SeriesPartitionTable {
for (int i = 0; i < timePartitionSlotNum; i++) {
TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot();
timePartitionSlot.read(protocol);
-
int consensusGroupIdNum = ReadWriteIOUtils.readInt(inputStream);
List<TConsensusGroupId> consensusGroupIds = new Vector<>();
for (int j = 0; j < consensusGroupIdNum; j++) {
@@ -284,7 +277,6 @@ public class SeriesPartitionTable {
consensusGroupId.read(protocol);
consensusGroupIds.add(consensusGroupId);
}
-
seriesPartitionMap.put(timePartitionSlot, consensusGroupIds);
}
}