This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch thread-safely-seriespartitiontable
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/thread-safely-seriespartitiontable by this push:
new 92f1129fb43 finish
92f1129fb43 is described below
commit 92f1129fb4318fdb840298d549a2700d7e8dfb1f
Author: YongzaoDan <[email protected]>
AuthorDate: Thu Jun 6 15:57:32 2024 +0800
finish
---
.../manager/partition/PartitionManager.java | 2 +-
.../commons/partition/SeriesPartitionTable.java | 330 +++++++++++++--------
2 files changed, 203 insertions(+), 129 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 35d82a5454e..b16314ff5ad 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -466,7 +466,7 @@ public class PartitionManager {
return getConsensusManager().write(plan);
} catch (ConsensusException e) {
// The allocation might fail due to consensus error
- LOGGER.error("Write DataPartition allocation result failed because: {}",
status);
+ LOGGER.error("Write partition allocation result failed because: {}",
status);
TSStatus res = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
return res;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
index 890e46a451a..b722f564b19 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
@@ -42,26 +42,42 @@ import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
public class SeriesPartitionTable {
+ private final ReentrantReadWriteLock seriesPartitionMapLock;
private final TreeMap<TTimePartitionSlot, List<TConsensusGroupId>>
seriesPartitionMap;
public SeriesPartitionTable() {
+ this.seriesPartitionMapLock = new ReentrantReadWriteLock();
this.seriesPartitionMap = new TreeMap<>();
}
public SeriesPartitionTable(Map<TTimePartitionSlot, List<TConsensusGroupId>>
seriesPartitionMap) {
+ this.seriesPartitionMapLock = new ReentrantReadWriteLock();
this.seriesPartitionMap = new TreeMap<>(seriesPartitionMap);
}
public Map<TTimePartitionSlot, List<TConsensusGroupId>>
getSeriesPartitionMap() {
- return seriesPartitionMap;
+ seriesPartitionMapLock.readLock().lock();
+ try {
+ return seriesPartitionMap;
+ } finally {
+ seriesPartitionMapLock.readLock().unlock();
+ }
}
public void putDataPartition(TTimePartitionSlot timePartitionSlot,
TConsensusGroupId groupId) {
- seriesPartitionMap.computeIfAbsent(timePartitionSlot, empty -> new
ArrayList<>()).add(groupId);
+ seriesPartitionMapLock.writeLock().lock();
+ try {
+ seriesPartitionMap
+ .computeIfAbsent(timePartitionSlot, empty -> new ArrayList<>())
+ .add(groupId);
+ } finally {
+ seriesPartitionMapLock.writeLock().unlock();
+ }
}
/**
@@ -74,52 +90,56 @@ public class SeriesPartitionTable {
public boolean getDataPartition(
TTimeSlotList partitionSlotList, SeriesPartitionTable
seriesPartitionTable) {
AtomicBoolean result = new AtomicBoolean(true);
+ seriesPartitionMapLock.readLock().lock();
List<TTimePartitionSlot> partitionSlots =
partitionSlotList.getTimePartitionSlots();
-
- if (partitionSlots.isEmpty()) {
- // Return all DataPartitions in one SeriesPartitionSlot
- // when the queried TimePartitionSlots are empty
- seriesPartitionTable.getSeriesPartitionMap().putAll(seriesPartitionMap);
- } else {
- boolean isNeedLeftAll = partitionSlotList.isNeedLeftAll(),
- isNeedRightAll = partitionSlotList.isNeedRightAll();
- if (isNeedLeftAll || isNeedRightAll) {
- // we need to calculate the leftMargin which contains all the time
partition on the unclosed
- // left side: (-oo, leftMargin)
- // and the rightMargin which contains all the time partition on the
unclosed right side:
- // (rightMargin, +oo)
- // all the remaining closed time range which locates in [leftMargin,
rightMargin] will be
- // calculated outside if block
- long leftMargin = isNeedLeftAll ? partitionSlots.get(0).getStartTime()
: Long.MIN_VALUE,
- rightMargin =
- isNeedRightAll
- ? partitionSlots.get(partitionSlots.size() -
1).getStartTime()
- : Long.MAX_VALUE;
- seriesPartitionTable
- .getSeriesPartitionMap()
- .putAll(
- seriesPartitionMap.entrySet().stream()
- .filter(
- entry -> {
- long startTime = entry.getKey().getStartTime();
- return startTime < leftMargin || startTime >
rightMargin;
- })
- .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue)));
+ try {
+ if (partitionSlots.isEmpty()) {
+ // Return all DataPartitions in one SeriesPartitionSlot
+ // when the queried TimePartitionSlots are empty
+
seriesPartitionTable.getSeriesPartitionMap().putAll(seriesPartitionMap);
+ } else {
+ boolean isNeedLeftAll = partitionSlotList.isNeedLeftAll(),
+ isNeedRightAll = partitionSlotList.isNeedRightAll();
+ if (isNeedLeftAll || isNeedRightAll) {
+ // we need to calculate the leftMargin which contains all the time
partition on the
+ // unclosed
+ // left side: (-oo, leftMargin)
+ // and the rightMargin which contains all the time partition on the
unclosed right side:
+ // (rightMargin, +oo)
+ // all the remaining closed time range which locates in [leftMargin,
rightMargin] will be
+ // calculated outside if block
+ long leftMargin = isNeedLeftAll ?
partitionSlots.get(0).getStartTime() : Long.MIN_VALUE,
+ rightMargin =
+ isNeedRightAll
+ ? partitionSlots.get(partitionSlots.size() -
1).getStartTime()
+ : Long.MAX_VALUE;
+ seriesPartitionTable
+ .getSeriesPartitionMap()
+ .putAll(
+ seriesPartitionMap.entrySet().stream()
+ .filter(
+ entry -> {
+ long startTime = entry.getKey().getStartTime();
+ return startTime < leftMargin || startTime >
rightMargin;
+ })
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue)));
+ }
+
+ // Return the DataPartition for each match TimePartitionSlot
+ partitionSlots.forEach(
+ timePartitionSlot -> {
+ if (seriesPartitionMap.containsKey(timePartitionSlot)) {
+ seriesPartitionTable
+ .getSeriesPartitionMap()
+ .put(timePartitionSlot,
seriesPartitionMap.get(timePartitionSlot));
+ } else {
+ result.set(false);
+ }
+ });
}
-
- // Return the DataPartition for each match TimePartitionSlot
- partitionSlots.forEach(
- timePartitionSlot -> {
- if (seriesPartitionMap.containsKey(timePartitionSlot)) {
- seriesPartitionTable
- .getSeriesPartitionMap()
- .put(timePartitionSlot,
seriesPartitionMap.get(timePartitionSlot));
- } else {
- result.set(false);
- }
- });
+ } finally {
+ seriesPartitionMapLock.readLock().unlock();
}
-
return result.get();
}
@@ -130,8 +150,13 @@ public class SeriesPartitionTable {
* @return The specified DataPartition's successor if exists, null otherwise
*/
public TConsensusGroupId getSuccessorDataPartition(TTimePartitionSlot
timePartitionSlot) {
- TTimePartitionSlot successorSlot =
seriesPartitionMap.higherKey(timePartitionSlot);
- return successorSlot == null ? null :
seriesPartitionMap.get(successorSlot).get(0);
+ seriesPartitionMapLock.readLock().lock();
+ try {
+ TTimePartitionSlot successorSlot =
seriesPartitionMap.higherKey(timePartitionSlot);
+ return successorSlot == null ? null :
seriesPartitionMap.get(successorSlot).get(0);
+ } finally {
+ seriesPartitionMapLock.readLock().unlock();
+ }
}
/**
@@ -141,8 +166,13 @@ public class SeriesPartitionTable {
* @return The specified DataPartition's predecessor if exists, null
otherwise
*/
public TConsensusGroupId getPredecessorDataPartition(TTimePartitionSlot
timePartitionSlot) {
- TTimePartitionSlot predecessorSlot =
seriesPartitionMap.lowerKey(timePartitionSlot);
- return predecessorSlot == null ? null :
seriesPartitionMap.get(predecessorSlot).get(0);
+ seriesPartitionMapLock.readLock().lock();
+ try {
+ TTimePartitionSlot predecessorSlot =
seriesPartitionMap.lowerKey(timePartitionSlot);
+ return predecessorSlot == null ? null :
seriesPartitionMap.get(predecessorSlot).get(0);
+ } finally {
+ seriesPartitionMapLock.readLock().unlock();
+ }
}
/**
@@ -153,28 +183,38 @@ public class SeriesPartitionTable {
* @return the timePartition's corresponding dataRegionIds. return the
dataRegions which
* timeslotIds are in the time range [startTimeSlotId, endTimeSlotId].
*/
- List<TConsensusGroupId> getRegionId(
+ public List<TConsensusGroupId> getRegionId(
TTimePartitionSlot startTimeSlotId, TTimePartitionSlot endTimeSlotId) {
- return seriesPartitionMap.entrySet().stream()
- .filter(
- entry ->
- entry.getKey().getStartTime() >= startTimeSlotId.getStartTime()
- && entry.getKey().getStartTime() <=
endTimeSlotId.getStartTime())
- .flatMap(entry -> entry.getValue().stream())
- .collect(Collectors.toList());
+ seriesPartitionMapLock.readLock().lock();
+ try {
+ return seriesPartitionMap.entrySet().stream()
+ .filter(
+ entry ->
+ entry.getKey().getStartTime() >=
startTimeSlotId.getStartTime()
+ && entry.getKey().getStartTime() <=
endTimeSlotId.getStartTime())
+ .flatMap(entry -> entry.getValue().stream())
+ .collect(Collectors.toList());
+ } finally {
+ seriesPartitionMapLock.readLock().unlock();
+ }
}
- List<TTimePartitionSlot> getTimeSlotList(
+ public List<TTimePartitionSlot> getTimeSlotList(
TConsensusGroupId regionId, long startTime, long endTime) {
- if (regionId.getId() == -1) {
- return seriesPartitionMap.keySet().stream()
- .filter(e -> e.getStartTime() >= startTime && e.getStartTime() <
endTime)
- .collect(Collectors.toList());
- } else {
- return seriesPartitionMap.keySet().stream()
- .filter(e -> e.getStartTime() >= startTime && e.getStartTime() <
endTime)
- .filter(e -> seriesPartitionMap.get(e).contains(regionId))
- .collect(Collectors.toList());
+ seriesPartitionMapLock.readLock().lock();
+ try {
+ if (regionId.getId() == -1) {
+ return seriesPartitionMap.keySet().stream()
+ .filter(e -> e.getStartTime() >= startTime && e.getStartTime() <
endTime)
+ .collect(Collectors.toList());
+ } else {
+ return seriesPartitionMap.keySet().stream()
+ .filter(e -> e.getStartTime() >= startTime && e.getStartTime() <
endTime)
+ .filter(e -> seriesPartitionMap.get(e).contains(regionId))
+ .collect(Collectors.toList());
+ }
+ } finally {
+ seriesPartitionMapLock.readLock().unlock();
}
}
@@ -190,18 +230,23 @@ public class SeriesPartitionTable {
SeriesPartitionTable assignedSeriesPartitionTable,
TSeriesPartitionSlot seriesPartitionSlot,
Map<TConsensusGroupId, Map<TSeriesPartitionSlot, AtomicLong>>
groupDeltaMap) {
- assignedSeriesPartitionTable
- .getSeriesPartitionMap()
- .forEach(
- ((timePartitionSlot, consensusGroupIds) -> {
- seriesPartitionMap.put(timePartitionSlot, new
Vector<>(consensusGroupIds));
- consensusGroupIds.forEach(
- consensusGroupId ->
- groupDeltaMap
- .computeIfAbsent(consensusGroupId, empty -> new
ConcurrentHashMap<>())
- .computeIfAbsent(seriesPartitionSlot, empty -> new
AtomicLong(0))
- .getAndIncrement());
- }));
+ seriesPartitionMapLock.writeLock().lock();
+ try {
+ assignedSeriesPartitionTable
+ .getSeriesPartitionMap()
+ .forEach(
+ ((timePartitionSlot, consensusGroupIds) -> {
+ seriesPartitionMap.put(timePartitionSlot, new
Vector<>(consensusGroupIds));
+ consensusGroupIds.forEach(
+ consensusGroupId ->
+ groupDeltaMap
+ .computeIfAbsent(consensusGroupId, empty -> new
ConcurrentHashMap<>())
+ .computeIfAbsent(seriesPartitionSlot, empty -> new
AtomicLong(0))
+ .getAndIncrement());
+ }));
+ } finally {
+ seriesPartitionMapLock.writeLock().unlock();
+ }
}
/**
@@ -213,15 +258,18 @@ public class SeriesPartitionTable {
*/
public synchronized List<TTimePartitionSlot>
filterUnassignedDataPartitionSlots(
List<TTimePartitionSlot> partitionSlots) {
+ seriesPartitionMapLock.readLock().lock();
List<TTimePartitionSlot> result = new Vector<>();
-
- partitionSlots.forEach(
- timePartitionSlot -> {
- if (!seriesPartitionMap.containsKey(timePartitionSlot)) {
- result.add(timePartitionSlot);
- }
- });
-
+ try {
+ partitionSlots.forEach(
+ timePartitionSlot -> {
+ if (!seriesPartitionMap.containsKey(timePartitionSlot)) {
+ result.add(timePartitionSlot);
+ }
+ });
+ } finally {
+ seriesPartitionMapLock.readLock().unlock();
+ }
return result;
}
@@ -231,74 +279,100 @@ public class SeriesPartitionTable {
* @return The last DataPartition's ConsensusGroupId, null if there are no
DataPartitions yet
*/
public TConsensusGroupId getLastConsensusGroupId() {
- Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry =
- seriesPartitionMap.lastEntry();
- if (lastEntry == null) {
- return null;
+ seriesPartitionMapLock.readLock().lock();
+ try {
+ Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry =
+ seriesPartitionMap.lastEntry();
+ if (lastEntry == null) {
+ return null;
+ }
+ return lastEntry.getValue().get(lastEntry.getValue().size() - 1);
+ } finally {
+ seriesPartitionMapLock.readLock().unlock();
}
- return lastEntry.getValue().get(lastEntry.getValue().size() - 1);
}
public void serialize(OutputStream outputStream, TProtocol protocol)
throws IOException, TException {
- ReadWriteIOUtils.write(seriesPartitionMap.size(), outputStream);
- for (Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>>
seriesPartitionEntry :
- seriesPartitionMap.entrySet()) {
- seriesPartitionEntry.getKey().write(protocol);
- ReadWriteIOUtils.write(seriesPartitionEntry.getValue().size(),
outputStream);
- for (TConsensusGroupId consensusGroupId :
seriesPartitionEntry.getValue()) {
- consensusGroupId.write(protocol);
+ seriesPartitionMapLock.readLock().lock();
+ try {
+ ReadWriteIOUtils.write(seriesPartitionMap.size(), outputStream);
+ for (Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>>
seriesPartitionEntry :
+ seriesPartitionMap.entrySet()) {
+ seriesPartitionEntry.getKey().write(protocol);
+ ReadWriteIOUtils.write(seriesPartitionEntry.getValue().size(),
outputStream);
+ for (TConsensusGroupId consensusGroupId :
seriesPartitionEntry.getValue()) {
+ consensusGroupId.write(protocol);
+ }
}
+ } finally {
+ seriesPartitionMapLock.readLock().unlock();
}
}
/** Only for ConsensusRequest. */
public void deserialize(ByteBuffer buffer) {
- int timePartitionSlotNum = buffer.getInt();
- for (int i = 0; i < timePartitionSlotNum; i++) {
- TTimePartitionSlot timePartitionSlot =
- ThriftCommonsSerDeUtils.deserializeTTimePartitionSlot(buffer);
-
- int consensusGroupIdNum = buffer.getInt();
- List<TConsensusGroupId> consensusGroupIds = new Vector<>();
- for (int j = 0; j < consensusGroupIdNum; j++) {
-
consensusGroupIds.add(ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer));
+ seriesPartitionMapLock.writeLock().lock();
+ try {
+ int timePartitionSlotNum = buffer.getInt();
+ for (int i = 0; i < timePartitionSlotNum; i++) {
+ TTimePartitionSlot timePartitionSlot =
+ ThriftCommonsSerDeUtils.deserializeTTimePartitionSlot(buffer);
+ int consensusGroupIdNum = buffer.getInt();
+ List<TConsensusGroupId> consensusGroupIds = new Vector<>();
+ for (int j = 0; j < consensusGroupIdNum; j++) {
+
consensusGroupIds.add(ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer));
+ }
+ seriesPartitionMap.put(timePartitionSlot, consensusGroupIds);
}
-
- seriesPartitionMap.put(timePartitionSlot, consensusGroupIds);
+ } finally {
+ seriesPartitionMapLock.writeLock().unlock();
}
}
/** Only for Snapshot. */
public void deserialize(InputStream inputStream, TProtocol protocol)
throws IOException, TException {
- int timePartitionSlotNum = ReadWriteIOUtils.readInt(inputStream);
- for (int i = 0; i < timePartitionSlotNum; i++) {
- TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot();
- timePartitionSlot.read(protocol);
-
- int consensusGroupIdNum = ReadWriteIOUtils.readInt(inputStream);
- List<TConsensusGroupId> consensusGroupIds = new Vector<>();
- for (int j = 0; j < consensusGroupIdNum; j++) {
- TConsensusGroupId consensusGroupId = new TConsensusGroupId();
- consensusGroupId.read(protocol);
- consensusGroupIds.add(consensusGroupId);
+ seriesPartitionMapLock.writeLock().lock();
+ try {
+ int timePartitionSlotNum = ReadWriteIOUtils.readInt(inputStream);
+ for (int i = 0; i < timePartitionSlotNum; i++) {
+ TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot();
+ timePartitionSlot.read(protocol);
+ int consensusGroupIdNum = ReadWriteIOUtils.readInt(inputStream);
+ List<TConsensusGroupId> consensusGroupIds = new Vector<>();
+ for (int j = 0; j < consensusGroupIdNum; j++) {
+ TConsensusGroupId consensusGroupId = new TConsensusGroupId();
+ consensusGroupId.read(protocol);
+ consensusGroupIds.add(consensusGroupId);
+ }
+ seriesPartitionMap.put(timePartitionSlot, consensusGroupIds);
}
-
- seriesPartitionMap.put(timePartitionSlot, consensusGroupIds);
+ } finally {
+ seriesPartitionMapLock.writeLock().unlock();
}
}
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- SeriesPartitionTable that = (SeriesPartitionTable) o;
- return seriesPartitionMap.equals(that.seriesPartitionMap);
+ seriesPartitionMapLock.readLock().lock();
+ try {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ SeriesPartitionTable that = (SeriesPartitionTable) o;
+ return seriesPartitionMap.equals(that.seriesPartitionMap);
+ } finally {
+ seriesPartitionMapLock.readLock().unlock();
+ }
}
@Override
public int hashCode() {
- return Objects.hash(seriesPartitionMap);
+ seriesPartitionMapLock.readLock().lock();
+ try {
+ return Objects.hash(seriesPartitionMap);
+ } finally {
+ seriesPartitionMapLock.readLock().unlock();
+ }
}
}