This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2e93bc6b36 [IOTDB-4377] Fix TTimePartitionSlot count and DataPartition
inherit policy bug (#7287)
2e93bc6b36 is described below
commit 2e93bc6b361b5e34cb19dc2757ffe3c3b44c56de
Author: YongzaoDan <[email protected]>
AuthorDate: Tue Sep 13 15:34:13 2022 +0800
[IOTDB-4377] Fix TTimePartitionSlot count and DataPartition inherit policy
bug (#7287)
---
.../iotdb/confignode/manager/ConfigManager.java | 2 +-
.../iotdb/confignode/manager/PartitionManager.java | 17 +--
.../partition/GreedyPartitionAllocator.java | 93 +++++++++---
.../persistence/partition/PartitionInfo.java | 9 +-
.../persistence/partition/RegionGroup.java | 68 +++++++--
.../partition/StorageGroupPartitionTable.java | 88 ++++++------
.../IoTDBClusterPartitionTableTest.java | 160 +++++++++++++--------
.../db/it/{ => confignode}/IoTDBConfigNodeIT.java | 2 +-
.../commons/partition/DataPartitionTable.java | 14 +-
.../commons/partition/SchemaPartitionTable.java | 18 +--
.../commons/partition/SeriesPartitionTable.java | 19 ++-
11 files changed, 312 insertions(+), 178 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index dcf3149530..f6136d558d 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -799,7 +799,7 @@ public class ConfigManager implements IManager {
if (!allLeadership.isEmpty()) {
String regionType =
regionInfo.getDataNodeId()
- ==
allLeadership.get(regionInfo.getConsensusGroupId())
+ ==
allLeadership.getOrDefault(regionInfo.getConsensusGroupId(), -1)
? RegionRoleType.Leader.toString()
: RegionRoleType.Follower.toString();
regionInfo.setRoleType(regionType);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index 5808989e01..cffdc97c8d 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -72,7 +72,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/** The PartitionManager Manages cluster PartitionTable read and write
requests. */
@@ -237,16 +236,9 @@ public class PartitionManager {
// Map<StorageGroup, unassigned SeriesPartitionSlot count>
Map<String, Integer> unassignedDataPartitionSlotsCountMap = new
ConcurrentHashMap<>();
unassignedDataPartitionSlotsMap.forEach(
- (storageGroup, unassignedDataPartitionSlots) -> {
- AtomicInteger unassignedDataPartitionSlotsCount = new
AtomicInteger(0);
- unassignedDataPartitionSlots
- .values()
- .forEach(
- timePartitionSlots ->
-
unassignedDataPartitionSlotsCount.getAndAdd(timePartitionSlots.size()));
- unassignedDataPartitionSlotsCountMap.put(
- storageGroup, unassignedDataPartitionSlotsCount.get());
- });
+ (storageGroup, unassignedDataPartitionSlots) ->
+ unassignedDataPartitionSlotsCountMap.put(
+ storageGroup, unassignedDataPartitionSlots.size()));
TSStatus status =
extendRegionsIfNecessary(
unassignedDataPartitionSlotsCountMap,
TConsensusGroupType.DataRegion);
@@ -302,7 +294,8 @@ public class PartitionManager {
float allocatedRegionCount =
partitionInfo.getRegionCount(entry.getKey(), consensusGroupType);
// The slotCount equals to the sum of assigned slot count and
unassigned slot count
- float slotCount = partitionInfo.getSlotCount(entry.getKey()) +
entry.getValue();
+ float slotCount =
+ partitionInfo.getAssignedSeriesPartitionSlotsCount(entry.getKey())
+ entry.getValue();
float maxRegionCount =
getClusterSchemaManager().getMaxRegionGroupCount(entry.getKey(),
consensusGroupType);
float maxSlotCount =
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
index 7feeeb2a80..3f4e230154 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.confignode.manager.PartitionManager;
import org.apache.iotdb.tsfile.utils.Pair;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -38,6 +39,9 @@ import java.util.concurrent.ConcurrentHashMap;
/** Allocating new Partitions by greedy algorithm */
public class GreedyPartitionAllocator implements IPartitionAllocator {
+ private static final long TIME_PARTITION_INTERVAL =
+ ConfigNodeDescriptor.getInstance().getConf().getTimePartitionInterval();
+
private final IManager configManager;
public GreedyPartitionAllocator(IManager configManager) {
@@ -63,7 +67,7 @@ public class GreedyPartitionAllocator implements
IPartitionAllocator {
// Greedy allocation
schemaPartitionMap.put(seriesPartitionSlot,
regionSlotsCounter.get(0).getRight());
// Bubble sort
- bubbleSort(regionSlotsCounter);
+ bubbleSort(regionSlotsCounter.get(0).getRight(),
regionSlotsCounter);
}
result.put(storageGroup, new
SchemaPartitionTable(schemaPartitionMap));
});
@@ -84,46 +88,91 @@ public class GreedyPartitionAllocator implements
IPartitionAllocator {
getPartitionManager()
.getSortedRegionSlotsCounter(storageGroup,
TConsensusGroupType.DataRegion);
+ DataPartitionTable dataPartitionTable = new DataPartitionTable();
+
// Enumerate SeriesPartitionSlot
- Map<TSeriesPartitionSlot, SeriesPartitionTable> dataPartitionMap =
- new ConcurrentHashMap<>();
for (Map.Entry<TSeriesPartitionSlot, List<TTimePartitionSlot>>
seriesPartitionEntry :
unassignedPartitionSlotsMap.entrySet()) {
- // Enumerate TimePartitionSlot
- Map<TTimePartitionSlot, List<TConsensusGroupId>>
seriesPartitionMap =
- new ConcurrentHashMap<>();
- for (TTimePartitionSlot timePartitionSlot :
seriesPartitionEntry.getValue()) {
+ SeriesPartitionTable seriesPartitionTable = new
SeriesPartitionTable();
+
+ // Enumerate TimePartitionSlot in ascending order
+ List<TTimePartitionSlot> timePartitionSlots =
seriesPartitionEntry.getValue();
+
timePartitionSlots.sort(Comparator.comparingLong(TTimePartitionSlot::getStartTime));
+ for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
+
+ /* Check if the current DataPartition has predecessor firstly,
and inherit it if exists */
+
+ // Check if the current Partition's predecessor is allocated
+ // in the same batch of Partition creation
TConsensusGroupId predecessor =
+ seriesPartitionTable.getPrecededDataPartition(
+ timePartitionSlot, TIME_PARTITION_INTERVAL);
+ if (predecessor != null) {
+ seriesPartitionTable
+ .getSeriesPartitionMap()
+ .put(timePartitionSlot,
Collections.singletonList(predecessor));
+ bubbleSort(predecessor, regionSlotsCounter);
+ continue;
+ }
+
+ // Check if the current Partition's predecessor was allocated
+ // in the former Partition creation
+ predecessor =
getPartitionManager()
.getPrecededDataPartition(
storageGroup,
seriesPartitionEntry.getKey(),
timePartitionSlot,
-
ConfigNodeDescriptor.getInstance().getConf().getTimePartitionInterval());
+ TIME_PARTITION_INTERVAL);
if (predecessor != null) {
- // For DataPartition allocation, we consider predecessor first
- seriesPartitionMap.put(timePartitionSlot,
Collections.singletonList(predecessor));
- } else {
- // Greedy allocation
- seriesPartitionMap.put(
- timePartitionSlot,
-
Collections.singletonList(regionSlotsCounter.get(0).getRight()));
- // Bubble sort
- bubbleSort(regionSlotsCounter);
+ seriesPartitionTable
+ .getSeriesPartitionMap()
+ .put(timePartitionSlot,
Collections.singletonList(predecessor));
+ bubbleSort(predecessor, regionSlotsCounter);
+ continue;
}
+
+ /* Greedy allocation */
+ seriesPartitionTable
+ .getSeriesPartitionMap()
+ .put(
+ timePartitionSlot,
+
Collections.singletonList(regionSlotsCounter.get(0).getRight()));
+ bubbleSort(regionSlotsCounter.get(0).getRight(),
regionSlotsCounter);
}
- dataPartitionMap.put(
- seriesPartitionEntry.getKey(), new
SeriesPartitionTable(seriesPartitionMap));
+ dataPartitionTable
+ .getDataPartitionMap()
+ .put(seriesPartitionEntry.getKey(), seriesPartitionTable);
}
- result.put(storageGroup, new DataPartitionTable(dataPartitionMap));
+ result.put(storageGroup, dataPartitionTable);
});
return result;
}
- private void bubbleSort(List<Pair<Long, TConsensusGroupId>>
regionSlotsCounter) {
+ /**
+ * Bubble sort the regionSlotsCounter from the specified consensus group
+ *
+ * <p>Notice: Here we use bubble sort instead of other sorting algorithm is
because that, there is
+ * only one Partition allocated in each loop. Therefore, only consider one
consensus group weight
+ * change is enough
+ *
+ * @param consensusGroupId The consensus group where the new Partition is
allocated
+ * @param regionSlotsCounter List<Pair<Allocated Partition num,
TConsensusGroupId>>
+ */
+ private void bubbleSort(
+ TConsensusGroupId consensusGroupId, List<Pair<Long, TConsensusGroupId>>
regionSlotsCounter) {
+ // Find the corresponding consensus group
int index = 0;
- regionSlotsCounter.get(0).setLeft(regionSlotsCounter.get(0).getLeft() + 1);
+ for (int i = 0; i < regionSlotsCounter.size(); i++) {
+ if (regionSlotsCounter.get(i).getRight().equals(consensusGroupId)) {
+ index = i;
+ break;
+ }
+ }
+
+ // Do bubble sort
+
regionSlotsCounter.get(index).setLeft(regionSlotsCounter.get(index).getLeft() +
1);
while (index < regionSlotsCounter.size() - 1
&& regionSlotsCounter.get(index).getLeft() >
regionSlotsCounter.get(index + 1).getLeft()) {
Collections.swap(regionSlotsCounter, index, index + 1);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index c757b9f8ba..7cd539b456 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -80,6 +80,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
+import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -441,7 +442,7 @@ public class PartitionInfo implements SnapshotProcessor {
/** Get region information */
public DataSet getRegionInfoList(GetRegionInfoListPlan regionsInfoPlan) {
RegionInfoListResp regionResp = new RegionInfoListResp();
- List<TRegionInfo> regionInfoList = new ArrayList<>();
+ List<TRegionInfo> regionInfoList = new Vector<>();
if (storageGroupPartitionTables.isEmpty()) {
regionResp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
regionResp.setRegionInfoList(new ArrayList<>());
@@ -455,7 +456,7 @@ public class PartitionInfo implements SnapshotProcessor {
if (storageGroups != null && !storageGroups.contains(storageGroup)) {
return;
}
- storageGroupPartitionTable.getRegionInfoList(regionsInfoPlan,
regionInfoList);
+
regionInfoList.addAll(storageGroupPartitionTable.getRegionInfoList(regionsInfoPlan));
});
regionInfoList.sort(
Comparator.comparingInt(regionId ->
regionId.getConsensusGroupId().getId()));
@@ -588,8 +589,8 @@ public class PartitionInfo implements SnapshotProcessor {
return
storageGroupPartitionTables.get(storageGroup).getRegionGroupCount(type);
}
- public int getSlotCount(String storageGroup) {
- return storageGroupPartitionTables.get(storageGroup).getSlotsCount();
+ public int getAssignedSeriesPartitionSlotsCount(String storageGroup) {
+ return
storageGroupPartitionTables.get(storageGroup).getAssignedSeriesPartitionSlotsCount();
}
/**
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
index 4f0c4298b2..b4930f078b 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.persistence.partition;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.thrift.TException;
@@ -28,27 +29,34 @@ import org.apache.thrift.protocol.TProtocol;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
public class RegionGroup {
private final TRegionReplicaSet replicaSet;
+ // Map<TSeriesPartitionSlot, TTimePartitionSlot Count>
+ // For SchemaRegion, each SeriesSlot constitute a SchemaPartition.
// For DataRegion, a SeriesSlot and a TimeSlot constitute a DataPartition.
// Eg: A DataRegion contains SeriesSlot-1 which has TimeSlot-1, TimeSlot-2
and Timeslot-3,
// then (SeriesSlot-1 -> TimeSlot-1) constitute a DataPartition.
- // For SchemaRegion, each SeriesSlot constitute a SchemaPartition.
- private final AtomicLong slotCount;
+ private final Map<TSeriesPartitionSlot, AtomicLong> slotCountMap;
+
+ private final AtomicLong totalTimeSlotCount;
public RegionGroup() {
this.replicaSet = new TRegionReplicaSet();
- this.slotCount = new AtomicLong();
+ this.slotCountMap = new ConcurrentHashMap<>();
+ this.totalTimeSlotCount = new AtomicLong();
}
public RegionGroup(TRegionReplicaSet replicaSet) {
this.replicaSet = replicaSet;
- this.slotCount = new AtomicLong(0);
+ this.slotCountMap = new ConcurrentHashMap<>();
+ this.totalTimeSlotCount = new AtomicLong(0);
}
public TConsensusGroupId getId() {
@@ -59,24 +67,51 @@ public class RegionGroup {
return replicaSet;
}
- public void addCounter(long delta) {
- slotCount.getAndAdd(delta);
+ /** @param deltaMap Map<TSeriesPartitionSlot, Delta TTimePartitionSlot
Count> */
+ public void updateSlotCountMap(Map<TSeriesPartitionSlot, AtomicLong>
deltaMap) {
+ deltaMap.forEach(
+ ((seriesPartitionSlot, delta) -> {
+ slotCountMap
+ .computeIfAbsent(seriesPartitionSlot, empty -> new AtomicLong(0))
+ .getAndAdd(delta.get());
+ totalTimeSlotCount.getAndAdd(delta.get());
+ }));
+ }
+
+ public long getSeriesSlotCount() {
+ return slotCountMap.size();
}
- public long getCounter() {
- return slotCount.get();
+ public long getTimeSlotCount() {
+ return totalTimeSlotCount.get();
}
public void serialize(OutputStream outputStream, TProtocol protocol)
throws IOException, TException {
replicaSet.write(protocol);
- ReadWriteIOUtils.write(slotCount.get(), outputStream);
+
+ ReadWriteIOUtils.write(slotCountMap.size(), outputStream);
+ for (Map.Entry<TSeriesPartitionSlot, AtomicLong> slotCountEntry :
slotCountMap.entrySet()) {
+ slotCountEntry.getKey().write(protocol);
+ ReadWriteIOUtils.write(slotCountEntry.getValue().get(), outputStream);
+ }
+
+ ReadWriteIOUtils.write(totalTimeSlotCount.get(), outputStream);
}
public void deserialize(InputStream inputStream, TProtocol protocol)
throws IOException, TException {
replicaSet.read(protocol);
- slotCount.set(ReadWriteIOUtils.readLong(inputStream));
+
+ int size = ReadWriteIOUtils.readInt(inputStream);
+ for (int i = 0; i < size; i++) {
+ TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot();
+ seriesPartitionSlot.read(protocol);
+ AtomicLong slotCount = new
AtomicLong(ReadWriteIOUtils.readLong(inputStream));
+ slotCountMap.put(seriesPartitionSlot, slotCount);
+ }
+
+ totalTimeSlotCount.set(ReadWriteIOUtils.readLong(inputStream));
}
@Override
@@ -84,11 +119,20 @@ public class RegionGroup {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RegionGroup that = (RegionGroup) o;
- return replicaSet.equals(that.replicaSet);
+ for (Map.Entry<TSeriesPartitionSlot, AtomicLong> slotCountEntry :
slotCountMap.entrySet()) {
+ if (!that.slotCountMap.containsKey(slotCountEntry.getKey())) {
+ return false;
+ }
+ if (slotCountEntry.getValue().get() !=
that.slotCountMap.get(slotCountEntry.getKey()).get()) {
+ return false;
+ }
+ }
+ return replicaSet.equals(that.replicaSet)
+ && totalTimeSlotCount.get() == that.totalTimeSlotCount.get();
}
@Override
public int hashCode() {
- return Objects.hash(replicaSet);
+ return Objects.hash(replicaSet, slotCountMap, totalTimeSlotCount);
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
index aadabe9c1d..f527ea12f9 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
@@ -51,6 +51,7 @@ import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
public class StorageGroupPartitionTable {
private static final Logger LOGGER =
LoggerFactory.getLogger(StorageGroupPartitionTable.class);
@@ -59,10 +60,6 @@ public class StorageGroupPartitionTable {
// The name of storage group
private String storageGroupName;
- // Total number of SeriesPartitionSlots occupied by schema,
- // determines whether a new Region needs to be created
- private final AtomicInteger seriesPartitionSlotsCount;
-
// Region
private final Map<TConsensusGroupId, RegionGroup> regionGroupMap;
// SchemaPartition
@@ -72,7 +69,6 @@ public class StorageGroupPartitionTable {
public StorageGroupPartitionTable(String storageGroupName) {
this.storageGroupName = storageGroupName;
- this.seriesPartitionSlotsCount = new AtomicInteger(0);
this.regionGroupMap = new ConcurrentHashMap<>();
@@ -155,8 +151,10 @@ public class StorageGroupPartitionTable {
return result.getAndIncrement();
}
- public int getSlotsCount() {
- return seriesPartitionSlotsCount.get();
+ public int getAssignedSeriesPartitionSlotsCount() {
+ return Math.max(
+ schemaPartitionTable.getSchemaPartitionMap().size(),
+ dataPartitionTable.getDataPartitionMap().size());
}
/**
@@ -207,17 +205,14 @@ public class StorageGroupPartitionTable {
*/
public void createSchemaPartition(SchemaPartitionTable
assignedSchemaPartition) {
// Cache assigned result
- Map<TConsensusGroupId, AtomicInteger> deltaMap =
+ // Map<TConsensusGroupId, Map<TSeriesPartitionSlot, deltaTimeSlotCount>>
+ Map<TConsensusGroupId, Map<TSeriesPartitionSlot, AtomicLong>>
groupDeltaMap =
schemaPartitionTable.createSchemaPartition(assignedSchemaPartition);
- // Add counter
- AtomicInteger total = new AtomicInteger(0);
- deltaMap.forEach(
- ((consensusGroupId, delta) -> {
- total.getAndAdd(delta.get());
- regionGroupMap.get(consensusGroupId).addCounter(delta.get());
- }));
- seriesPartitionSlotsCount.getAndAdd(total.get());
+ // Update counter
+ groupDeltaMap.forEach(
+ ((consensusGroupId, deltaMap) ->
+
regionGroupMap.get(consensusGroupId).updateSlotCountMap(deltaMap)));
}
/**
@@ -227,16 +222,14 @@ public class StorageGroupPartitionTable {
*/
public void createDataPartition(DataPartitionTable assignedDataPartition) {
// Cache assigned result
- Map<TConsensusGroupId, AtomicInteger> deltaMap =
+ // Map<TConsensusGroupId, Map<TSeriesPartitionSlot, deltaTimeSlotCount>>
+ Map<TConsensusGroupId, Map<TSeriesPartitionSlot, AtomicLong>>
groupDeltaMap =
dataPartitionTable.createDataPartition(assignedDataPartition);
- // Add counter
- AtomicInteger total = new AtomicInteger(0);
- deltaMap.forEach(
- ((consensusGroupId, delta) -> {
- total.getAndAdd(delta.get());
- regionGroupMap.get(consensusGroupId).addCounter(delta.get());
- }));
+ // Update counter
+ groupDeltaMap.forEach(
+ ((consensusGroupId, deltaMap) ->
+
regionGroupMap.get(consensusGroupId).updateSlotCountMap(deltaMap)));
}
/**
@@ -293,7 +286,7 @@ public class StorageGroupPartitionTable {
regionGroupMap.forEach(
(consensusGroupId, regionGroup) -> {
if (consensusGroupId.getType().equals(type)) {
- result.add(new Pair<>(regionGroup.getCounter(), consensusGroupId));
+ result.add(new Pair<>(regionGroup.getSeriesSlotCount(),
consensusGroupId));
}
});
@@ -301,51 +294,51 @@ public class StorageGroupPartitionTable {
return result;
}
- public void getRegionInfoList(
- GetRegionInfoListPlan regionsInfoPlan, List<TRegionInfo> regionInfoList)
{
+ public List<TRegionInfo> getRegionInfoList(GetRegionInfoListPlan
regionsInfoPlan) {
+ List<TRegionInfo> regionInfoList = new Vector<>();
final TShowRegionReq showRegionReq = regionsInfoPlan.getShowRegionReq();
+
regionGroupMap.forEach(
(consensusGroupId, regionGroup) -> {
- TRegionReplicaSet replicaSet = regionGroup.getReplicaSet();
if (showRegionReq == null || showRegionReq.getConsensusGroupType()
== null) {
- buildTRegionsInfo(regionInfoList, replicaSet, regionGroup);
- } else if
(regionsInfoPlan.getShowRegionReq().getConsensusGroupType().ordinal()
- == replicaSet.getRegionId().getType().ordinal()) {
- buildTRegionsInfo(regionInfoList, replicaSet, regionGroup);
+ regionInfoList.addAll(buildRegionInfoList(regionGroup));
+ } else if
(showRegionReq.getConsensusGroupType().equals(regionGroup.getId().getType())) {
+ regionInfoList.addAll(buildRegionInfoList(regionGroup));
}
});
+
+ return regionInfoList;
}
- private void buildTRegionsInfo(
- List<TRegionInfo> regionInfoList, TRegionReplicaSet replicaSet,
RegionGroup regionGroup) {
- replicaSet
+ private List<TRegionInfo> buildRegionInfoList(RegionGroup regionGroup) {
+ List<TRegionInfo> regionInfoList = new Vector<>();
+ final TConsensusGroupId regionId = regionGroup.getId();
+
+ regionGroup
+ .getReplicaSet()
.getDataNodeLocations()
.forEach(
(dataNodeLocation) -> {
TRegionInfo regionInfo = new TRegionInfo();
- regionInfo.setConsensusGroupId(replicaSet.getRegionId());
+ regionInfo.setConsensusGroupId(regionId);
regionInfo.setStorageGroup(storageGroupName);
- if (replicaSet.getRegionId().getType() ==
TConsensusGroupType.DataRegion) {
-
regionInfo.setSeriesSlots(dataPartitionTable.getDataPartitionMap().size());
- regionInfo.setTimeSlots(regionGroup.getCounter());
- } else if (replicaSet.getRegionId().getType() ==
TConsensusGroupType.SchemaRegion) {
- regionInfo.setSeriesSlots(regionGroup.getCounter());
- regionInfo.setTimeSlots(0);
- }
+ regionInfo.setSeriesSlots(regionGroup.getSeriesSlotCount());
+ regionInfo.setTimeSlots(regionGroup.getTimeSlotCount());
regionInfo.setDataNodeId(dataNodeLocation.getDataNodeId());
regionInfo.setClientRpcIp(dataNodeLocation.getClientRpcEndPoint().getIp());
regionInfo.setClientRpcPort(dataNodeLocation.getClientRpcEndPoint().getPort());
- // TODO: Wait for data migration. And then add the state
+ // TODO: Maintain Region status
regionInfo.setStatus(RegionStatus.Up.getStatus());
regionInfoList.add(regionInfo);
});
+
+ return regionInfoList;
}
public void serialize(OutputStream outputStream, TProtocol protocol)
throws IOException, TException {
ReadWriteIOUtils.write(isPredeleted, outputStream);
ReadWriteIOUtils.write(storageGroupName, outputStream);
- ReadWriteIOUtils.write(seriesPartitionSlotsCount.get(), outputStream);
ReadWriteIOUtils.write(regionGroupMap.size(), outputStream);
for (Map.Entry<TConsensusGroupId, RegionGroup> regionInfoEntry :
regionGroupMap.entrySet()) {
@@ -361,7 +354,6 @@ public class StorageGroupPartitionTable {
throws IOException, TException {
isPredeleted = ReadWriteIOUtils.readBool(inputStream);
storageGroupName = ReadWriteIOUtils.readString(inputStream);
- seriesPartitionSlotsCount.set(ReadWriteIOUtils.readInt(inputStream));
int length = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < length; i++) {
@@ -445,7 +437,7 @@ public class StorageGroupPartitionTable {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
StorageGroupPartitionTable that = (StorageGroupPartitionTable) o;
- return isPredeleted == that.isPredeleted
+ return storageGroupName.equals(that.storageGroupName)
&& regionGroupMap.equals(that.regionGroupMap)
&& schemaPartitionTable.equals(that.schemaPartitionTable)
&& dataPartitionTable.equals(that.dataPartitionTable);
@@ -453,6 +445,6 @@ public class StorageGroupPartitionTable {
@Override
public int hashCode() {
- return Objects.hash(isPredeleted, regionGroupMap, schemaPartitionTable,
dataPartitionTable);
+ return Objects.hash(storageGroupName, regionGroupMap,
schemaPartitionTable, dataPartitionTable);
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBClusterPartitionTableTest.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBClusterPartitionTableTest.java
similarity index 69%
rename from
integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBClusterPartitionTableTest.java
rename to
integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBClusterPartitionTableTest.java
index 2ce0442427..63943d2b12 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBClusterPartitionTableTest.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBClusterPartitionTableTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.it;
+package org.apache.iotdb.db.it.confignode;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -30,6 +30,8 @@ import
org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.it.env.ConfigFactory;
@@ -54,7 +56,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-// TODO: @MiniSho Move this test into org.apache.iotdb.db.it.confignode package
@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
public class IoTDBClusterPartitionTableTest {
@@ -68,8 +69,10 @@ public class IoTDBClusterPartitionTableTest {
private static final long testTimePartitionInterval = 86400;
private static final String sg = "root.sg";
private static final int storageGroupNum = 5;
- private static final int seriesPartitionSlotsNum = 10;
- private static final int timePartitionSlotsNum = 100;
+ private static final int seriesPartitionSlotsNum = 10000;
+ private static final int seriesPartitionBatchSize = 1000;
+ private static final int timePartitionSlotsNum = 10;
+ private static final int timePartitionBatchSize = 10;
@Before
public void setUp() throws Exception {
@@ -206,21 +209,22 @@ public class IoTDBClusterPartitionTableTest {
}
private Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
- constructPartitionSlotsMap() {
- final String sg = "root.sg";
+ constructPartitionSlotsMap(
+ String storageGroup,
+ int seriesSlotStart,
+ int seriesSlotEnd,
+ long timeSlotStart,
+ long timeSlotEnd) {
Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> result =
new HashMap<>();
-
- for (int i = 0; i < storageGroupNum; i++) {
- String storageGroup = sg + i;
- result.put(storageGroup, new HashMap<>());
- for (int j = 0; j < seriesPartitionSlotsNum; j++) {
- TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(j);
- result.get(storageGroup).put(seriesPartitionSlot, new ArrayList<>());
- for (long k = 0; k < timePartitionSlotsNum; k++) {
- TTimePartitionSlot timePartitionSlot =
- new TTimePartitionSlot(k * testTimePartitionInterval);
-
result.get(storageGroup).get(seriesPartitionSlot).add(timePartitionSlot);
- }
+ result.put(storageGroup, new HashMap<>());
+
+ for (int i = seriesSlotStart; i < seriesSlotEnd; i++) {
+ TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
+ result.get(storageGroup).put(seriesPartitionSlot, new ArrayList<>());
+ for (long j = timeSlotStart; j < timeSlotEnd; j++) {
+ TTimePartitionSlot timePartitionSlot =
+ new TTimePartitionSlot(j * testTimePartitionInterval);
+
result.get(storageGroup).get(seriesPartitionSlot).add(timePartitionSlot);
}
}
@@ -228,32 +232,36 @@ public class IoTDBClusterPartitionTableTest {
}
private void checkDataPartitionMap(
+ String storageGroup,
+ int seriesSlotStart,
+ int seriesSlotEnd,
+ long timeSlotStart,
+ long timeSlotEnd,
Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TConsensusGroupId>>>>
dataPartitionTable) {
- Assert.assertEquals(storageGroupNum, dataPartitionTable.size());
- for (int i = 0; i < storageGroupNum; i++) {
- String storageGroup = sg + i;
- Assert.assertTrue(dataPartitionTable.containsKey(storageGroup));
- Assert.assertEquals(seriesPartitionSlotsNum,
dataPartitionTable.get(storageGroup).size());
- for (int j = 0; j < seriesPartitionSlotsNum; j++) {
- TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(j);
-
Assert.assertTrue(dataPartitionTable.get(storageGroup).containsKey(seriesPartitionSlot));
- Assert.assertEquals(
- timePartitionSlotsNum,
-
dataPartitionTable.get(storageGroup).get(seriesPartitionSlot).size());
-
- Map<TTimePartitionSlot, List<TConsensusGroupId>> timePartitionSlotMap =
- dataPartitionTable.get(storageGroup).get(seriesPartitionSlot);
- for (long k = 0; k < timePartitionSlotsNum; k++) {
- TTimePartitionSlot timePartitionSlot =
- new TTimePartitionSlot(k * testTimePartitionInterval);
-
Assert.assertTrue(timePartitionSlotMap.containsKey(timePartitionSlot));
- if (k > 0) {
- // Check consistency
- Assert.assertEquals(
- timePartitionSlotMap.get(new TTimePartitionSlot(0)),
- timePartitionSlotMap.get(timePartitionSlot));
- }
+
+ Assert.assertTrue(dataPartitionTable.containsKey(storageGroup));
+ Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>
+ seriesPartitionTable = dataPartitionTable.get(storageGroup);
+ Assert.assertEquals(seriesPartitionBatchSize, seriesPartitionTable.size());
+
+ for (int i = seriesSlotStart; i < seriesSlotEnd; i++) {
+ TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
+ Assert.assertTrue(seriesPartitionTable.containsKey(seriesPartitionSlot));
+ Map<TTimePartitionSlot, List<TConsensusGroupId>> timePartitionTable =
+ seriesPartitionTable.get(seriesPartitionSlot);
+ Assert.assertEquals(timePartitionBatchSize, timePartitionTable.size());
+
+ for (long j = timeSlotStart; j < timeSlotEnd; j++) {
+ TTimePartitionSlot timePartitionSlot =
+ new TTimePartitionSlot(j * testTimePartitionInterval);
+ Assert.assertTrue(timePartitionTable.containsKey(timePartitionSlot));
+ if (j > timeSlotStart) {
+ // Check consistency
+ Assert.assertEquals(
+ timePartitionTable.get(
+ new TTimePartitionSlot(timeSlotStart *
testTimePartitionInterval)),
+ timePartitionTable.get(timePartitionSlot));
}
}
}
@@ -269,7 +277,7 @@ public class IoTDBClusterPartitionTableTest {
// Prepare partitionSlotsMap
Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
partitionSlotsMap =
- constructPartitionSlotsMap();
+ constructPartitionSlotsMap(sg + 0, 0, 10, 0, 10);
// Set StorageGroups
for (int i = 0; i < storageGroupNum; i++) {
@@ -287,22 +295,58 @@ public class IoTDBClusterPartitionTableTest {
Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
Assert.assertEquals(0,
dataPartitionTableResp.getDataPartitionTableSize());
- // Test getOrCreateDataPartition, ConfigNode should create DataPartition
and return
- dataPartitionTableResp =
client.getOrCreateDataPartitionTable(dataPartitionReq);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- dataPartitionTableResp.getStatus().getCode());
- Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
- checkDataPartitionMap(dataPartitionTableResp.getDataPartitionTable());
+ for (int i = 0; i < storageGroupNum; i++) {
+ String storageGroup = sg + i;
+ for (int j = 0; j < seriesPartitionSlotsNum; j +=
seriesPartitionBatchSize) {
+ for (long k = 0; k < timePartitionSlotsNum; k +=
timePartitionBatchSize) {
+ partitionSlotsMap =
+ constructPartitionSlotsMap(
+ storageGroup, j, j + seriesPartitionBatchSize, k, k +
timePartitionBatchSize);
+
+ // Test getOrCreateDataPartition, ConfigNode should create
DataPartition and return
+ dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap);
+ dataPartitionTableResp =
client.getOrCreateDataPartitionTable(dataPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ dataPartitionTableResp.getStatus().getCode());
+
Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
+ checkDataPartitionMap(
+ storageGroup,
+ j,
+ j + seriesPartitionBatchSize,
+ k,
+ k + timePartitionBatchSize,
+ dataPartitionTableResp.getDataPartitionTable());
+
+ // Test getDataPartition, the result should only contain
DataPartition created before
+ dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap);
+ dataPartitionTableResp =
client.getDataPartitionTable(dataPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ dataPartitionTableResp.getStatus().getCode());
+
Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
+ checkDataPartitionMap(
+ storageGroup,
+ j,
+ j + seriesPartitionBatchSize,
+ k,
+ k + timePartitionBatchSize,
+ dataPartitionTableResp.getDataPartitionTable());
+ }
+ }
+ }
- // Test getDataPartition, the result should only contain DataPartition
created before
- dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap);
- dataPartitionTableResp = client.getDataPartitionTable(dataPartitionReq);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- dataPartitionTableResp.getStatus().getCode());
- Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
- checkDataPartitionMap(dataPartitionTableResp.getDataPartitionTable());
+ // Test DataPartition inherit policy
+ TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
+ showRegionResp
+ .getRegionInfoList()
+ .forEach(
+ regionInfo -> {
+ // Normally, all Timeslots belonging to the same SeriesSlot
are allocated to the
+ // same DataRegionGroup
+ Assert.assertEquals(
+ regionInfo.getSeriesSlots() * timePartitionSlotsNum,
regionInfo.getTimeSlots());
+ });
}
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBConfigNodeIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBConfigNodeIT.java
similarity index 99%
rename from
integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBConfigNodeIT.java
rename to
integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBConfigNodeIT.java
index 8bc3f7956e..ebdc7f80d4 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBConfigNodeIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBConfigNodeIT.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.it;
+package org.apache.iotdb.db.it.confignode;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
index 9322474cb4..ff3a75feb2 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
@@ -36,7 +36,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
public class DataPartitionTable {
@@ -119,11 +119,12 @@ public class DataPartitionTable {
* Create DataPartition within the specific StorageGroup
*
* @param assignedDataPartition Assigned result
- * @return Number of DataPartitions added to each Region
+ * @return Map<TConsensusGroupId, Map<TSeriesPartitionSlot, Delta
TTimePartitionSlot Count>>
*/
- public Map<TConsensusGroupId, AtomicInteger> createDataPartition(
+ public Map<TConsensusGroupId, Map<TSeriesPartitionSlot, AtomicLong>>
createDataPartition(
DataPartitionTable assignedDataPartition) {
- Map<TConsensusGroupId, AtomicInteger> deltaMap = new ConcurrentHashMap<>();
+ Map<TConsensusGroupId, Map<TSeriesPartitionSlot, AtomicLong>>
groupDeltaMap =
+ new ConcurrentHashMap<>();
assignedDataPartition
.getDataPartitionMap()
@@ -131,9 +132,10 @@ public class DataPartitionTable {
((seriesPartitionSlot, seriesPartitionTable) ->
dataPartitionMap
.computeIfAbsent(seriesPartitionSlot, empty -> new
SeriesPartitionTable())
- .createDataPartition(seriesPartitionTable, deltaMap)));
+ .createDataPartition(
+ seriesPartitionTable, seriesPartitionSlot,
groupDeltaMap)));
- return deltaMap;
+ return groupDeltaMap;
}
/**
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionTable.java
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionTable.java
index e13f7bfc95..61b9189e16 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionTable.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionTable.java
@@ -36,7 +36,7 @@ import java.util.Objects;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
public class SchemaPartitionTable {
@@ -89,23 +89,25 @@ public class SchemaPartitionTable {
* Create SchemaPartition within the specific StorageGroup
*
* @param assignedSchemaPartition assigned result
- * @return Number of SchemaPartitions added to each Region
+ * @return Map<TConsensusGroupId, Map<TSeriesPartitionSlot, Delta
TTimePartitionSlot Count(always
+ * 0)>>
*/
- public Map<TConsensusGroupId, AtomicInteger> createSchemaPartition(
+ public Map<TConsensusGroupId, Map<TSeriesPartitionSlot, AtomicLong>>
createSchemaPartition(
SchemaPartitionTable assignedSchemaPartition) {
- Map<TConsensusGroupId, AtomicInteger> deltaMap = new ConcurrentHashMap<>();
+ Map<TConsensusGroupId, Map<TSeriesPartitionSlot, AtomicLong>>
groupDeltaMap =
+ new ConcurrentHashMap<>();
assignedSchemaPartition
.getSchemaPartitionMap()
.forEach(
((seriesPartitionSlot, consensusGroupId) -> {
schemaPartitionMap.put(seriesPartitionSlot, consensusGroupId);
- deltaMap
- .computeIfAbsent(consensusGroupId, empty -> new
AtomicInteger(0))
- .getAndIncrement();
+ groupDeltaMap
+ .computeIfAbsent(consensusGroupId, empty -> new
ConcurrentHashMap<>())
+ .put(seriesPartitionSlot, new AtomicLong(0));
}));
- return deltaMap;
+ return groupDeltaMap;
}
/**
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
index ca2d069712..ac599887c1 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.commons.partition;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -37,7 +38,7 @@ import java.util.Objects;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
public class SeriesPartitionTable {
@@ -112,19 +113,25 @@ public class SeriesPartitionTable {
* Create DataPartition within the specific SeriesPartitionSlot
*
* @param assignedSeriesPartitionTable Assigned result
- * @param deltaMap Number of DataPartitions added to each Region
+ * @param seriesPartitionSlot Corresponding TSeriesPartitionSlot
+ * @param groupDeltaMap Map<TConsensusGroupId, Map<TSeriesPartitionSlot,
Delta TTimePartitionSlot
+ * Count>>
*/
public void createDataPartition(
SeriesPartitionTable assignedSeriesPartitionTable,
- Map<TConsensusGroupId, AtomicInteger> deltaMap) {
+ TSeriesPartitionSlot seriesPartitionSlot,
+ Map<TConsensusGroupId, Map<TSeriesPartitionSlot, AtomicLong>>
groupDeltaMap) {
assignedSeriesPartitionTable
.getSeriesPartitionMap()
.forEach(
((timePartitionSlot, consensusGroupIds) -> {
seriesPartitionMap.put(timePartitionSlot, new
Vector<>(consensusGroupIds));
- deltaMap
- .computeIfAbsent(consensusGroupIds.get(0), empty -> new
AtomicInteger(0))
- .getAndIncrement();
+ consensusGroupIds.forEach(
+ consensusGroupId ->
+ groupDeltaMap
+ .computeIfAbsent(consensusGroupId, empty -> new
ConcurrentHashMap<>())
+ .computeIfAbsent(seriesPartitionSlot, empty -> new
AtomicLong(0))
+ .getAndIncrement());
}));
}