This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-3455
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/IOTDB-3455 by this push:
new 9929bbca3d adjust the local config node strategy to support multiple
data region, pass write and query test
9929bbca3d is described below
commit 9929bbca3d02607262db82d17c7e1c9ad37bb6ca
Author: LiuXuxin <[email protected]>
AuthorDate: Mon Aug 22 21:38:52 2022 +0800
adjust the local config node strategy to support multiple data region, pass
write and query test
---
.../resources/conf/iotdb-datanode.properties | 23 ++-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 +
.../db/localconfignode/DataRegionIdGenerator.java | 39 +++++
.../iotdb/db/localconfignode/LocalConfigNode.java | 104 ++++++-----
.../db/localconfignode/LocalDataPartitionInfo.java | 115 +++++++++++++
.../localconfignode/LocalDataPartitionTable.java | 191 +++++++++++++--------
.../plan/scheduler/StandaloneSchedulerTest.java | 9 +-
7 files changed, 363 insertions(+), 123 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties
b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 42d67f75f0..4d5ca2f17e 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -400,12 +400,6 @@ timestamp_precision=ms
# Datatype: boolean
# enable_partial_insert=true
-# number of data regions per user-defined storage group
-# a data region is the unit of parallelism in memory as all ingestions in one
data region are serialized
-# recommended value is [data region number] = [CPU core number] /
[user-defined storage group number]
-# Datatype: int
-# data_region_num = 1
-
# the interval to log recover progress of each vsg when starting iotdb
# Datatype: int
# recovery_log_interval_in_ms=5000
@@ -1096,4 +1090,19 @@ trigger_forward_http_pool_size=200
# Trigger HTTP forward pool max connection for per route
trigger_forward_http_pool_max_per_route=20
# Trigger MQTT forward pool size
-trigger_forward_mqtt_pool_size=4
\ No newline at end of file
+trigger_forward_mqtt_pool_size=4
+
+
+#######################
+### LocalConfigNode ###
+#######################
+
+# number of data regions per user-defined storage group
+# a data region is the unit of parallelism in memory as all ingestions in one
data region are serialized
+# recommended value is [data region number] = [CPU core number] /
[user-defined storage group number]
+# Datatype: int
+# data_region_num=1
+
+# The expected max number of partition slot for per storage group
+# Datatype: int
+# series_partition_slot_num=10000
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 6376b09224..5be29793e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -889,6 +889,11 @@ public class IoTDBDescriptor {
properties.getProperty(
"coordinator_write_executor_size",
Integer.toString(conf.getCoordinatorWriteExecutorSize()))));
+ conf.setSeriesPartitionSlotNum(
+ Integer.parseInt(
+ properties.getProperty(
+ "series_partition_slot_num",
+ Integer.toString(conf.getSeriesPartitionSlotNum()))));
// commons
commonDescriptor.loadCommonProps(properties);
diff --git
a/server/src/main/java/org/apache/iotdb/db/localconfignode/DataRegionIdGenerator.java
b/server/src/main/java/org/apache/iotdb/db/localconfignode/DataRegionIdGenerator.java
new file mode 100644
index 0000000000..2cbd0ca0ed
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/localconfignode/DataRegionIdGenerator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.localconfignode;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DataRegionIdGenerator {
+ private static final DataRegionIdGenerator INSTANCE = new
DataRegionIdGenerator();
+ private final AtomicInteger idCounter = new AtomicInteger(0);
+
+ public static DataRegionIdGenerator getInstance() {
+ return INSTANCE;
+ }
+
+ public void setCurrentId(int id) {
+ idCounter.set(id);
+ }
+
+ public int getNextId() {
+ return idCounter.addAndGet(1);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
index 1d4a8956df..fb02237665 100644
---
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
@@ -120,7 +120,7 @@ public class LocalConfigNode {
private static final Logger logger =
LoggerFactory.getLogger(LocalConfigNode.class);
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
- private static final long STANDALONE_MOCK_TIME_SLOT_START_TIME = 0L;
+ public static final long STANDALONE_MOCK_TIME_SLOT_START_TIME = 0L;
private volatile boolean initialized = false;
private ScheduledExecutorService timedForceMLogThread;
@@ -134,7 +134,7 @@ public class LocalConfigNode {
private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
- private final LocalDataPartitionTable dataPartitionTable =
LocalDataPartitionTable.getInstance();
+ private final LocalDataPartitionInfo dataPartitionTable =
LocalDataPartitionInfo.getInstance();
private final SeriesPartitionExecutor executor =
SeriesPartitionExecutor.getSeriesPartitionExecutor(
@@ -203,7 +203,7 @@ public class LocalConfigNode {
if (config.isMppMode() && !config.isClusterMode()) {
Map<String, List<DataRegionId>> recoveredLocalDataRegionInfo =
storageEngine.getLocalDataRegionInfo();
- dataPartitionTable.init(recoveredLocalDataRegionInfo);
+ dataPartitionTable.init(null);
}
} catch (MetadataException | IOException e) {
logger.error(
@@ -857,10 +857,12 @@ public class LocalConfigNode {
* root.sg1. If there's no storage group on the given path,
StorageGroupNotSetException will be
* thrown.
*/
- public DataRegionId getBelongedDataRegionId(PartialPath path)
+ public DataRegionId getBelongedDataRegionId(
+ PartialPath path, TTimePartitionSlot timePartitionSlot)
throws MetadataException, DataRegionException {
PartialPath storageGroup =
storageGroupSchemaManager.getBelongedStorageGroup(path);
- DataRegionId dataRegionId =
dataPartitionTable.getDataRegionId(storageGroup, path);
+ DataRegionId dataRegionId =
+ dataPartitionTable.getDataRegionId(storageGroup, path,
timePartitionSlot);
if (dataRegionId == null) {
return null;
}
@@ -875,13 +877,16 @@ public class LocalConfigNode {
}
// This interface involves storage group and data region auto creation
- public DataRegionId getBelongedDataRegionIdWithAutoCreate(PartialPath path)
+ public DataRegionId getBelongedDataRegionIdWithAutoCreate(
+ PartialPath path, TTimePartitionSlot timePartitionSlot)
throws MetadataException, DataRegionException {
PartialPath storageGroup =
storageGroupSchemaManager.getBelongedStorageGroup(path);
- DataRegionId dataRegionId =
dataPartitionTable.getDataRegionId(storageGroup, path);
+ DataRegionId dataRegionId =
+ dataPartitionTable.getDataRegionId(storageGroup, path,
timePartitionSlot);
if (dataRegionId == null) {
- dataPartitionTable.setDataPartitionInfo(storageGroup);
- dataRegionId = dataPartitionTable.getDataRegionId(storageGroup, path);
+ dataPartitionTable.registerStorageGroup(storageGroup);
+ dataRegionId =
+ dataPartitionTable.allocateDataRegionForNewSlot(storageGroup, path,
timePartitionSlot);
}
DataRegion dataRegion = storageEngine.getDataRegion(dataRegionId);
if (dataRegion == null) {
@@ -890,14 +895,6 @@ public class LocalConfigNode {
return dataRegionId;
}
- public List<DataRegionId> getDataRegionIdsByStorageGroup(PartialPath
storageGroup) {
- return dataPartitionTable.getDataRegionIdsByStorageGroup(storageGroup);
- }
-
- // endregion
-
- // region Interfaces for StandaloneSchemaFetcher
-
public Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>>
getSchemaPartition(
PathPatternTree patternTree) {
@@ -955,7 +952,9 @@ public class LocalConfigNode {
return partitionSlotsMap;
}
- // endregion
+ private List<DataRegionId> getAllRegionForOneSg(PartialPath storageGroup) {
+ return dataPartitionTable.getDataRegionIdsByStorageGroup(storageGroup);
+ }
// region Interfaces for StandalonePartitionFetcher
public DataPartition getDataPartition(
@@ -972,20 +971,41 @@ public class LocalConfigNode {
deviceToRegionsMap = new HashMap<>();
for (DataPartitionQueryParam dataPartitionQueryParam :
dataPartitionQueryParams) {
String deviceId = dataPartitionQueryParam.getDevicePath();
- DataRegionId dataRegionId = getBelongedDataRegionId(new
PartialPath(deviceId));
- // dataRegionId is null means the DataRegion is not created,
- // use an empty dataPartitionMap to init DataPartition
- if (dataRegionId != null) {
- Map<TTimePartitionSlot, List<TRegionReplicaSet>>
timePartitionToRegionsMap =
- deviceToRegionsMap.getOrDefault(
- executor.getSeriesPartitionSlot(deviceId), new HashMap<>());
- timePartitionToRegionsMap.put(
- new TTimePartitionSlot(STANDALONE_MOCK_TIME_SLOT_START_TIME),
- Collections.singletonList(
- genStandaloneRegionReplicaSet(
- TConsensusGroupType.DataRegion, dataRegionId.getId())));
- deviceToRegionsMap.put(
- executor.getSeriesPartitionSlot(deviceId),
timePartitionToRegionsMap);
+ List<TTimePartitionSlot> timePartitionSlots =
+ dataPartitionQueryParam.getTimePartitionSlotList();
+ if (timePartitionSlots.size() == 0) {
+ // if there is no time slot for in the query params
+ // we return all the region id for this sg
+ PartialPath storageGroup =
+ storageGroupSchemaManager.getBelongedStorageGroup(new
PartialPath(deviceId));
+ List<DataRegionId> regionIdForCurrSg =
getAllRegionForOneSg(storageGroup);
+ List<TRegionReplicaSet> replicaSetList = new ArrayList<>();
+ regionIdForCurrSg.forEach(
+ x ->
+ replicaSetList.add(
+
genStandaloneRegionReplicaSet(TConsensusGroupType.DataRegion, x.getId())));
+ deviceToRegionsMap
+ .computeIfAbsent(executor.getSeriesPartitionSlot(deviceId), x ->
new HashMap<>())
+ .put(new
TTimePartitionSlot(STANDALONE_MOCK_TIME_SLOT_START_TIME), replicaSetList);
+ continue;
+ }
+ for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
+ DataRegionId dataRegionId =
+ getBelongedDataRegionId(new PartialPath(deviceId),
timePartitionSlot);
+ // dataRegionId is null means the DataRegion is not created,
+ // use an empty dataPartitionMap to init DataPartition
+ if (dataRegionId != null) {
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>>
timePartitionToRegionsMap =
+ deviceToRegionsMap.getOrDefault(
+ executor.getSeriesPartitionSlot(deviceId), new
HashMap<>());
+ timePartitionToRegionsMap.put(
+ timePartitionSlot,
+ Collections.singletonList(
+ genStandaloneRegionReplicaSet(
+ TConsensusGroupType.DataRegion,
dataRegionId.getId())));
+ deviceToRegionsMap.put(
+ executor.getSeriesPartitionSlot(deviceId),
timePartitionToRegionsMap);
+ }
}
}
if (!deviceToRegionsMap.isEmpty()) {
@@ -1028,22 +1048,22 @@ public class LocalConfigNode {
for (DataPartitionQueryParam dataPartitionQueryParam :
dataPartitionQueryParams) {
// for each device
String deviceId = dataPartitionQueryParam.getDevicePath();
- DataRegionId dataRegionId =
- getBelongedDataRegionIdWithAutoCreate(new PartialPath(deviceId));
- Map<TTimePartitionSlot, List<TRegionReplicaSet>>
timePartitionToRegionsMap =
- deviceToRegionsMap.getOrDefault(
- executor.getSeriesPartitionSlot(deviceId), new HashMap<>());
- for (TTimePartitionSlot timePartitionSlot :
- dataPartitionQueryParam.getTimePartitionSlotList()) {
- // for each time partition
+ List<TTimePartitionSlot> timePartitionSlotList =
+ dataPartitionQueryParam.getTimePartitionSlotList();
+ for (TTimePartitionSlot timePartitionSlot : timePartitionSlotList) {
+ DataRegionId dataRegionId =
+ getBelongedDataRegionIdWithAutoCreate(new PartialPath(deviceId),
timePartitionSlot);
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>>
timePartitionToRegionsMap =
+ deviceToRegionsMap.getOrDefault(
+ executor.getSeriesPartitionSlot(deviceId), new HashMap<>());
timePartitionToRegionsMap.put(
timePartitionSlot,
Collections.singletonList(
genStandaloneRegionReplicaSet(
TConsensusGroupType.DataRegion, dataRegionId.getId())));
+ deviceToRegionsMap.put(
+ executor.getSeriesPartitionSlot(deviceId),
timePartitionToRegionsMap);
}
- deviceToRegionsMap.put(
- executor.getSeriesPartitionSlot(deviceId),
timePartitionToRegionsMap);
}
dataPartitionMap.put(storageGroupName, deviceToRegionsMap);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionInfo.java
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionInfo.java
new file mode 100644
index 0000000000..d267938080
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionInfo.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.localconfignode;
+
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+// This class is used for data partition maintaining the map between storage
group and
+// dataRegionIds.
+public class LocalDataPartitionInfo {
+
+ // storageGroup -> LocalDataPartitionTable
+ private Map<PartialPath, LocalDataPartitionTable> partitionTableMap;
+
+ private static class LocalDataPartitionTableHolder {
+ private static final LocalDataPartitionInfo INSTANCE = new
LocalDataPartitionInfo();
+
+ private LocalDataPartitionTableHolder() {}
+ }
+
+ private LocalDataPartitionInfo() {}
+
+ public static LocalDataPartitionInfo getInstance() {
+ return LocalDataPartitionTableHolder.INSTANCE;
+ }
+
+ public synchronized void init(ByteBuffer byteBuffer) throws
IllegalPathException {
+ partitionTableMap = new ConcurrentHashMap<>();
+ // TODO: recover partition table from input stream
+ }
+
+ public synchronized void clear() {
+ if (partitionTableMap != null) {
+ partitionTableMap.clear();
+ partitionTableMap = null;
+ }
+ }
+
+ public DataRegionId getDataRegionId(
+ PartialPath storageGroup, PartialPath path, TTimePartitionSlot
timePartitionSlot) {
+ if (!partitionTableMap.containsKey(storageGroup)) {
+ return null;
+ }
+ LocalDataPartitionTable table = partitionTableMap.get(storageGroup);
+ Map<TTimePartitionSlot, DataRegionId> slotRegionMap = new HashMap<>();
+ if (!table.getDataRegionId(path,
Collections.singletonList(timePartitionSlot), slotRegionMap)) {
+ return null;
+ } else {
+ return slotRegionMap.get(timePartitionSlot);
+ }
+ }
+
+ /**
+ * Try to allocate a data region for the new time partition slot. This
function will try to create
+ * new data region to make expansion if the existing data regions meet some
condition.
+ *
+ * @param storageGroup The path for the storage group.
+ * @param path The full path for the series.
+ * @param timePartitionSlot The time partition slot to allocate.
+ * @return The data region id for the time partition slot.
+ */
+ public DataRegionId allocateDataRegionForNewSlot(
+ PartialPath storageGroup, PartialPath path, TTimePartitionSlot
timePartitionSlot) {
+ LocalDataPartitionTable table = partitionTableMap.get(storageGroup);
+ return table.getDataRegionWithAutoExtension(path, timePartitionSlot);
+ }
+
+ public List<DataRegionId> getDataRegionIdsByStorageGroup(PartialPath
storageGroup) {
+ if (partitionTableMap.containsKey(storageGroup)) {
+ LocalDataPartitionTable partitionTable =
partitionTableMap.get(storageGroup);
+ return partitionTable.getAllDataRegionId();
+ }
+ return Collections.emptyList();
+ }
+
+ public synchronized void registerStorageGroup(PartialPath storageGroup) {
+ if (partitionTableMap.containsKey(storageGroup)) {
+ return;
+ }
+ partitionTableMap.put(storageGroup, new
LocalDataPartitionTable(storageGroup.getFullPath()));
+ }
+
+ public synchronized void deleteStorageGroup(PartialPath storageGroup) {
+ LocalDataPartitionTable partitionTable =
partitionTableMap.remove(storageGroup);
+ if (partitionTable != null) {
+ partitionTable.clear();
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionTable.java
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionTable.java
index 52662e3f9d..3a8af9c407 100644
---
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionTable.java
+++
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionTable.java
@@ -19,116 +19,163 @@
package org.apache.iotdb.db.localconfignode;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.partition.SeriesPartitionTable;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
-// This class is used for data partition maintaining the map between storage
group and
-// dataRegionIds.
public class LocalDataPartitionTable {
+ private static final Logger LOG =
LoggerFactory.getLogger(LocalDataPartitionTable.class);
- private AtomicInteger dataRegionIdGenerator;
-
- private Map<PartialPath, List<DataRegionId>> table;
+ private String storageGroupName;
+ private Map<PartialPath, SeriesPartitionTable> partitionTableMap;
+ private List<DataRegionId> regionList = new ArrayList<>();
+ private Map<DataRegionId, AtomicInteger> regionSlotCountMap = new
ConcurrentHashMap<>();
- private static class LocalDataPartitionTableHolder {
- private static final LocalDataPartitionTable INSTANCE = new
LocalDataPartitionTable();
+ public LocalDataPartitionTable() {}
- private LocalDataPartitionTableHolder() {};
+ public LocalDataPartitionTable(String storageGroupName) {
+ this.storageGroupName = storageGroupName;
+ this.partitionTableMap = new ConcurrentHashMap<>();
}
- private LocalDataPartitionTable() {}
-
- public static LocalDataPartitionTable getInstance() {
- return LocalDataPartitionTableHolder.INSTANCE;
+ public void init(ByteBuffer buffer) {
+ // TODO: init from byte buffer
}
- public synchronized void init(Map<String, List<DataRegionId>>
recoveredLocalDataRegionInfo)
- throws IllegalPathException {
- table = new ConcurrentHashMap<>();
- dataRegionIdGenerator = new AtomicInteger(0);
- for (Map.Entry<String, List<DataRegionId>> entry :
recoveredLocalDataRegionInfo.entrySet()) {
- String storageGroup = entry.getKey();
- List<DataRegionId> dataRegionIdList = new CopyOnWriteArrayList<>();
- table.put(new PartialPath(storageGroup), dataRegionIdList);
- for (DataRegionId dataRegionId :
recoveredLocalDataRegionInfo.get(storageGroup)) {
- dataRegionIdList.add(dataRegionId);
-
- if (dataRegionId.getId() >= dataRegionIdGenerator.get()) {
- dataRegionIdGenerator.set(dataRegionId.getId() + 1);
- }
- }
- }
+ public void serialize(OutputStream outputStream) {
+ // TODO: serialize the table to output stream
}
- public synchronized void clear() {
- if (table != null) {
- table.clear();
- table = null;
+ /**
+ * Get the TimePartitionSlot to DataRegionId Map. The result is stored in
param slotRegionMap.
+ *
+ * @param path The full path for the series.
+ * @param timePartitionSlots The time partition slots for the series.
+ * @param slotRegionMap The map that store the result.
+ * @return Whether all the partitions exist.
+ */
+ public boolean getDataRegionId(
+ PartialPath path,
+ List<TTimePartitionSlot> timePartitionSlots,
+ Map<TTimePartitionSlot, DataRegionId> slotRegionMap) {
+ if (!partitionTableMap.containsKey(path)) {
+ return false;
}
-
- if (dataRegionIdGenerator != null) {
- dataRegionIdGenerator = null;
+ SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable();
+ boolean allPartitionExists =
+ partitionTableMap.get(path).getDataPartition(timePartitionSlots,
seriesPartitionTable);
+ for (Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> entry :
+ seriesPartitionTable.getSeriesPartitionMap().entrySet()) {
+ if (entry.getValue().size() > 0) {
+ slotRegionMap.put(entry.getKey(), new
DataRegionId(entry.getValue().get(0).getId()));
+ }
}
+ return allPartitionExists;
}
- public synchronized void putDataRegionId(PartialPath storageGroup,
DataRegionId dataRegionId) {
- table.get(storageGroup).add(dataRegionId);
- }
-
- public synchronized void removeDataRegionId(PartialPath storageGroup,
DataRegionId dataRegionId) {
- table.get(storageGroup).remove(dataRegionId);
+ /**
+ * Get all data region id of current storage group
+ *
+ * @return data region id in list
+ */
+ public List<DataRegionId> getAllDataRegionId() {
+ Set<DataRegionId> regionIdSet = new HashSet<>();
+ for (SeriesPartitionTable partitionTable : partitionTableMap.values()) {
+ Map<TTimePartitionSlot, List<TConsensusGroupId>> partitionGroupIdMap =
+ partitionTable.getSeriesPartitionMap();
+ for (List<TConsensusGroupId> ids : partitionGroupIdMap.values()) {
+ ids.forEach(x -> regionIdSet.add(new DataRegionId(x.getId())));
+ }
+ }
+ return new ArrayList<>(regionIdSet);
}
- public DataRegionId getDataRegionId(PartialPath storageGroup, PartialPath
path) {
- if (!table.containsKey(storageGroup)) {
- return null;
+ public int getTimeSlotNum() {
+ int slotCount = 0;
+ for (SeriesPartitionTable partitionTable : partitionTableMap.values()) {
+ Map<TTimePartitionSlot, List<TConsensusGroupId>> partitionGroupIdMap =
+ partitionTable.getSeriesPartitionMap();
+ slotCount += partitionGroupIdMap.size();
}
- return table.get(storageGroup).get(0);
+ return slotCount;
}
- public List<DataRegionId> getInvolvedDataRegionIds(
- PartialPath storageGroup, PartialPath pathPattern, boolean
isPrefixMatch) {
- List<DataRegionId> result = new ArrayList<>();
- if (table.containsKey(storageGroup)) {
- result.addAll(table.get(storageGroup));
+ public DataRegionId getDataRegionWithAutoExtension(
+ PartialPath path, TTimePartitionSlot timePartitionSlot) {
+ DataRegionId regionId;
+ if ((regionId = checkExpansion(path, timePartitionSlot)) != null) {
+ // return the newly allocated region
+ regionSlotCountMap.get(regionId).incrementAndGet();
+ return regionId;
}
- return result;
- }
- public List<DataRegionId> getDataRegionIdsByStorageGroup(PartialPath
storageGroup) {
- return table.getOrDefault(storageGroup, Collections.emptyList());
+ // select a region with min time partition slot
+ List<Pair<DataRegionId, AtomicInteger>> slotCountList = new ArrayList<>();
+ regionSlotCountMap.forEach((id, count) -> slotCountList.add(new Pair<>(id,
count)));
+ slotCountList.sort(Comparator.comparingInt(o -> o.right.get()));
+ DataRegionId chosenId = slotCountList.get(0).left;
+ regionSlotCountMap.get(chosenId).incrementAndGet();
+ return chosenId;
}
- public synchronized void setDataPartitionInfo(PartialPath storageGroup) {
- List<DataRegionId> dataRegionIdList;
- if (table.containsKey(storageGroup)) {
- dataRegionIdList = table.get(storageGroup);
- } else {
- dataRegionIdList = new CopyOnWriteArrayList<>();
+ public DataRegionId checkExpansion(PartialPath path, TTimePartitionSlot
timePartitionSlot) {
+ if (regionList.size() == 0) {
+ // there is no data region for this storage group, create one
+ return carryOutExpansion(path, timePartitionSlot);
+ }
+
+ // check the load and carry out the expansion if necessary
+ double allocatedSlotNum = getTimeSlotNum();
+ double maxRegionNum =
IoTDBDescriptor.getInstance().getConfig().getDataRegionNum();
+ double maxSlotNum =
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum();
+ if (regionList.size() < maxRegionNum
+ && allocatedSlotNum / (double) regionList.size() > maxSlotNum /
maxRegionNum) {
+ return carryOutExpansion(path, timePartitionSlot);
}
- dataRegionIdList.add(generateDataRegionId());
- table.put(storageGroup, dataRegionIdList);
+ return null;
}
- public synchronized List<DataRegionId> deleteStorageGroup(PartialPath
storageGroup) {
- if (table.containsKey(storageGroup)) {
- return table.remove(storageGroup);
+ private DataRegionId carryOutExpansion(PartialPath path, TTimePartitionSlot
timePartitionSlot) {
+ if (!partitionTableMap.containsKey(path)) {
+ partitionTableMap.put(path, new SeriesPartitionTable());
}
- return Collections.emptyList();
+ SeriesPartitionTable seriesPartitionTable = partitionTableMap.get(path);
+ Map<TTimePartitionSlot, List<TConsensusGroupId>> allotmentMap = new
HashMap<>();
+ int nextRegionId = DataRegionIdGenerator.getInstance().getNextId();
+ allotmentMap.put(
+ timePartitionSlot,
+ Collections.singletonList(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion,
nextRegionId)));
+ SeriesPartitionTable requestTable = new SeriesPartitionTable(allotmentMap);
+ Map<TConsensusGroupId, AtomicInteger> deltaMap = new HashMap<>();
+ seriesPartitionTable.createDataPartition(requestTable, deltaMap);
+ regionList.add(new DataRegionId(nextRegionId));
+ regionSlotCountMap.put(new DataRegionId(nextRegionId), new
AtomicInteger(0));
+ return new DataRegionId(nextRegionId);
}
- // This method may be extended to implement multi dataRegion for one
storageGroup
- // todo keep consistent with the partition method of config node in new
cluster
- private DataRegionId generateDataRegionId() {
- return new DataRegionId(dataRegionIdGenerator.getAndIncrement());
+ public void clear() {
+ // TODO: clear the table
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
index 34e2a9e85f..3afef548f5 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
@@ -71,6 +72,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import static
org.apache.iotdb.db.localconfignode.LocalConfigNode.STANDALONE_MOCK_TIME_SLOT_START_TIME;
+
public class StandaloneSchedulerTest {
private static final IoTDBConfig conf =
IoTDBDescriptor.getInstance().getConfig();
@@ -406,7 +409,8 @@ public class StandaloneSchedulerTest {
fragmentInstance.setDataRegionAndHost(regionReplicaSet);
configNode.getBelongedSchemaRegionIdWithAutoCreate(new
PartialPath(deviceId));
- configNode.getBelongedDataRegionIdWithAutoCreate(new
PartialPath(deviceId));
+ configNode.getBelongedDataRegionIdWithAutoCreate(
+ new PartialPath(deviceId), new
TTimePartitionSlot(STANDALONE_MOCK_TIME_SLOT_START_TIME));
MPPQueryContext context =
new MPPQueryContext(
"",
@@ -487,7 +491,8 @@ public class StandaloneSchedulerTest {
fragmentInstance.setDataRegionAndHost(regionReplicaSet);
configNode.getBelongedSchemaRegionIdWithAutoCreate(deviceId);
- configNode.getBelongedDataRegionIdWithAutoCreate(deviceId);
+ configNode.getBelongedDataRegionIdWithAutoCreate(
+ deviceId, new
TTimePartitionSlot(STANDALONE_MOCK_TIME_SLOT_START_TIME));
MPPQueryContext context =
new MPPQueryContext(
"",