This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-3455
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/IOTDB-3455 by this push:
     new 9929bbca3d adjust the local config node strategy to support multiple 
data region, pass write and query test
9929bbca3d is described below

commit 9929bbca3d02607262db82d17c7e1c9ad37bb6ca
Author: LiuXuxin <[email protected]>
AuthorDate: Mon Aug 22 21:38:52 2022 +0800

    adjust the local config node strategy to support multiple data region, pass 
write and query test
---
 .../resources/conf/iotdb-datanode.properties       |  23 ++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   5 +
 .../db/localconfignode/DataRegionIdGenerator.java  |  39 +++++
 .../iotdb/db/localconfignode/LocalConfigNode.java  | 104 ++++++-----
 .../db/localconfignode/LocalDataPartitionInfo.java | 115 +++++++++++++
 .../localconfignode/LocalDataPartitionTable.java   | 191 +++++++++++++--------
 .../plan/scheduler/StandaloneSchedulerTest.java    |   9 +-
 7 files changed, 363 insertions(+), 123 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties 
b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 42d67f75f0..4d5ca2f17e 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -400,12 +400,6 @@ timestamp_precision=ms
 # Datatype: boolean
 # enable_partial_insert=true
 
-# number of data regions per user-defined storage group
-# a data region is the unit of parallelism in memory as all ingestions in one 
data region are serialized
-# recommended value is [data region number] = [CPU core number] / 
[user-defined storage group number]
-# Datatype: int
-# data_region_num = 1
-
 # the interval to log recover progress of each vsg when starting iotdb
 # Datatype: int
 # recovery_log_interval_in_ms=5000
@@ -1096,4 +1090,19 @@ trigger_forward_http_pool_size=200
 # Trigger HTTP forward pool max connection for per route
 trigger_forward_http_pool_max_per_route=20
 # Trigger MQTT forward pool size
-trigger_forward_mqtt_pool_size=4
\ No newline at end of file
+trigger_forward_mqtt_pool_size=4
+
+
+#######################
+### LocalConfigNode ###
+#######################
+
+# number of data regions per user-defined storage group
+# a data region is the unit of parallelism in memory as all ingestions in one 
data region are serialized
+# recommended value is [data region number] = [CPU core number] / 
[user-defined storage group number]
+# Datatype: int
+# data_region_num=1
+
+# The expected max number of partition slot for per storage group
+# Datatype: int
+# series_partition_slot_num=10000
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 6376b09224..5be29793e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -889,6 +889,11 @@ public class IoTDBDescriptor {
               properties.getProperty(
                   "coordinator_write_executor_size",
                   Integer.toString(conf.getCoordinatorWriteExecutorSize()))));
+      conf.setSeriesPartitionSlotNum(
+          Integer.parseInt(
+              properties.getProperty(
+                  "series_partition_slot_num",
+                  Integer.toString(conf.getSeriesPartitionSlotNum()))));
 
       // commons
       commonDescriptor.loadCommonProps(properties);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/localconfignode/DataRegionIdGenerator.java
 
b/server/src/main/java/org/apache/iotdb/db/localconfignode/DataRegionIdGenerator.java
new file mode 100644
index 0000000000..2cbd0ca0ed
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/localconfignode/DataRegionIdGenerator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.localconfignode;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DataRegionIdGenerator {
+  private static final DataRegionIdGenerator INSTANCE = new 
DataRegionIdGenerator();
+  private final AtomicInteger idCounter = new AtomicInteger(0);
+
+  public static DataRegionIdGenerator getInstance() {
+    return INSTANCE;
+  }
+
+  public void setCurrentId(int id) {
+    idCounter.set(id);
+  }
+
+  public int getNextId() {
+    return idCounter.addAndGet(1);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java 
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
index 1d4a8956df..fb02237665 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
@@ -120,7 +120,7 @@ public class LocalConfigNode {
   private static final Logger logger = 
LoggerFactory.getLogger(LocalConfigNode.class);
 
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
-  private static final long STANDALONE_MOCK_TIME_SLOT_START_TIME = 0L;
+  public static final long STANDALONE_MOCK_TIME_SLOT_START_TIME = 0L;
   private volatile boolean initialized = false;
 
   private ScheduledExecutorService timedForceMLogThread;
@@ -134,7 +134,7 @@ public class LocalConfigNode {
 
   private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
 
-  private final LocalDataPartitionTable dataPartitionTable = 
LocalDataPartitionTable.getInstance();
+  private final LocalDataPartitionInfo dataPartitionTable = 
LocalDataPartitionInfo.getInstance();
 
   private final SeriesPartitionExecutor executor =
       SeriesPartitionExecutor.getSeriesPartitionExecutor(
@@ -203,7 +203,7 @@ public class LocalConfigNode {
       if (config.isMppMode() && !config.isClusterMode()) {
         Map<String, List<DataRegionId>> recoveredLocalDataRegionInfo =
             storageEngine.getLocalDataRegionInfo();
-        dataPartitionTable.init(recoveredLocalDataRegionInfo);
+        dataPartitionTable.init(null);
       }
     } catch (MetadataException | IOException e) {
       logger.error(
@@ -857,10 +857,12 @@ public class LocalConfigNode {
    * root.sg1. If there's no storage group on the given path, 
StorageGroupNotSetException will be
    * thrown.
    */
-  public DataRegionId getBelongedDataRegionId(PartialPath path)
+  public DataRegionId getBelongedDataRegionId(
+      PartialPath path, TTimePartitionSlot timePartitionSlot)
       throws MetadataException, DataRegionException {
     PartialPath storageGroup = 
storageGroupSchemaManager.getBelongedStorageGroup(path);
-    DataRegionId dataRegionId = 
dataPartitionTable.getDataRegionId(storageGroup, path);
+    DataRegionId dataRegionId =
+        dataPartitionTable.getDataRegionId(storageGroup, path, 
timePartitionSlot);
     if (dataRegionId == null) {
       return null;
     }
@@ -875,13 +877,16 @@ public class LocalConfigNode {
   }
 
   // This interface involves storage group and data region auto creation
-  public DataRegionId getBelongedDataRegionIdWithAutoCreate(PartialPath path)
+  public DataRegionId getBelongedDataRegionIdWithAutoCreate(
+      PartialPath path, TTimePartitionSlot timePartitionSlot)
       throws MetadataException, DataRegionException {
     PartialPath storageGroup = 
storageGroupSchemaManager.getBelongedStorageGroup(path);
-    DataRegionId dataRegionId = 
dataPartitionTable.getDataRegionId(storageGroup, path);
+    DataRegionId dataRegionId =
+        dataPartitionTable.getDataRegionId(storageGroup, path, 
timePartitionSlot);
     if (dataRegionId == null) {
-      dataPartitionTable.setDataPartitionInfo(storageGroup);
-      dataRegionId = dataPartitionTable.getDataRegionId(storageGroup, path);
+      dataPartitionTable.registerStorageGroup(storageGroup);
+      dataRegionId =
+          dataPartitionTable.allocateDataRegionForNewSlot(storageGroup, path, 
timePartitionSlot);
     }
     DataRegion dataRegion = storageEngine.getDataRegion(dataRegionId);
     if (dataRegion == null) {
@@ -890,14 +895,6 @@ public class LocalConfigNode {
     return dataRegionId;
   }
 
-  public List<DataRegionId> getDataRegionIdsByStorageGroup(PartialPath 
storageGroup) {
-    return dataPartitionTable.getDataRegionIdsByStorageGroup(storageGroup);
-  }
-
-  // endregion
-
-  // region Interfaces for StandaloneSchemaFetcher
-
   public Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> 
getSchemaPartition(
       PathPatternTree patternTree) {
 
@@ -955,7 +952,9 @@ public class LocalConfigNode {
     return partitionSlotsMap;
   }
 
-  // endregion
+  private List<DataRegionId> getAllRegionForOneSg(PartialPath storageGroup) {
+    return dataPartitionTable.getDataRegionIdsByStorageGroup(storageGroup);
+  }
 
   // region Interfaces for StandalonePartitionFetcher
   public DataPartition getDataPartition(
@@ -972,20 +971,41 @@ public class LocalConfigNode {
           deviceToRegionsMap = new HashMap<>();
       for (DataPartitionQueryParam dataPartitionQueryParam : 
dataPartitionQueryParams) {
         String deviceId = dataPartitionQueryParam.getDevicePath();
-        DataRegionId dataRegionId = getBelongedDataRegionId(new 
PartialPath(deviceId));
-        // dataRegionId is null means the DataRegion is not created,
-        // use an empty dataPartitionMap to init DataPartition
-        if (dataRegionId != null) {
-          Map<TTimePartitionSlot, List<TRegionReplicaSet>> 
timePartitionToRegionsMap =
-              deviceToRegionsMap.getOrDefault(
-                  executor.getSeriesPartitionSlot(deviceId), new HashMap<>());
-          timePartitionToRegionsMap.put(
-              new TTimePartitionSlot(STANDALONE_MOCK_TIME_SLOT_START_TIME),
-              Collections.singletonList(
-                  genStandaloneRegionReplicaSet(
-                      TConsensusGroupType.DataRegion, dataRegionId.getId())));
-          deviceToRegionsMap.put(
-              executor.getSeriesPartitionSlot(deviceId), 
timePartitionToRegionsMap);
+        List<TTimePartitionSlot> timePartitionSlots =
+            dataPartitionQueryParam.getTimePartitionSlotList();
+        if (timePartitionSlots.size() == 0) {
+          // if there is no time slot for in the query params
+          // we return all the region id for this sg
+          PartialPath storageGroup =
+              storageGroupSchemaManager.getBelongedStorageGroup(new 
PartialPath(deviceId));
+          List<DataRegionId> regionIdForCurrSg = 
getAllRegionForOneSg(storageGroup);
+          List<TRegionReplicaSet> replicaSetList = new ArrayList<>();
+          regionIdForCurrSg.forEach(
+              x ->
+                  replicaSetList.add(
+                      
genStandaloneRegionReplicaSet(TConsensusGroupType.DataRegion, x.getId())));
+          deviceToRegionsMap
+              .computeIfAbsent(executor.getSeriesPartitionSlot(deviceId), x -> 
new HashMap<>())
+              .put(new 
TTimePartitionSlot(STANDALONE_MOCK_TIME_SLOT_START_TIME), replicaSetList);
+          continue;
+        }
+        for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
+          DataRegionId dataRegionId =
+              getBelongedDataRegionId(new PartialPath(deviceId), 
timePartitionSlot);
+          // dataRegionId is null means the DataRegion is not created,
+          // use an empty dataPartitionMap to init DataPartition
+          if (dataRegionId != null) {
+            Map<TTimePartitionSlot, List<TRegionReplicaSet>> 
timePartitionToRegionsMap =
+                deviceToRegionsMap.getOrDefault(
+                    executor.getSeriesPartitionSlot(deviceId), new 
HashMap<>());
+            timePartitionToRegionsMap.put(
+                timePartitionSlot,
+                Collections.singletonList(
+                    genStandaloneRegionReplicaSet(
+                        TConsensusGroupType.DataRegion, 
dataRegionId.getId())));
+            deviceToRegionsMap.put(
+                executor.getSeriesPartitionSlot(deviceId), 
timePartitionToRegionsMap);
+          }
         }
       }
       if (!deviceToRegionsMap.isEmpty()) {
@@ -1028,22 +1048,22 @@ public class LocalConfigNode {
       for (DataPartitionQueryParam dataPartitionQueryParam : 
dataPartitionQueryParams) {
         // for each device
         String deviceId = dataPartitionQueryParam.getDevicePath();
-        DataRegionId dataRegionId =
-            getBelongedDataRegionIdWithAutoCreate(new PartialPath(deviceId));
-        Map<TTimePartitionSlot, List<TRegionReplicaSet>> 
timePartitionToRegionsMap =
-            deviceToRegionsMap.getOrDefault(
-                executor.getSeriesPartitionSlot(deviceId), new HashMap<>());
-        for (TTimePartitionSlot timePartitionSlot :
-            dataPartitionQueryParam.getTimePartitionSlotList()) {
-          // for each time partition
+        List<TTimePartitionSlot> timePartitionSlotList =
+            dataPartitionQueryParam.getTimePartitionSlotList();
+        for (TTimePartitionSlot timePartitionSlot : timePartitionSlotList) {
+          DataRegionId dataRegionId =
+              getBelongedDataRegionIdWithAutoCreate(new PartialPath(deviceId), 
timePartitionSlot);
+          Map<TTimePartitionSlot, List<TRegionReplicaSet>> 
timePartitionToRegionsMap =
+              deviceToRegionsMap.getOrDefault(
+                  executor.getSeriesPartitionSlot(deviceId), new HashMap<>());
           timePartitionToRegionsMap.put(
               timePartitionSlot,
               Collections.singletonList(
                   genStandaloneRegionReplicaSet(
                       TConsensusGroupType.DataRegion, dataRegionId.getId())));
+          deviceToRegionsMap.put(
+              executor.getSeriesPartitionSlot(deviceId), 
timePartitionToRegionsMap);
         }
-        deviceToRegionsMap.put(
-            executor.getSeriesPartitionSlot(deviceId), 
timePartitionToRegionsMap);
       }
       dataPartitionMap.put(storageGroupName, deviceToRegionsMap);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionInfo.java
 
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionInfo.java
new file mode 100644
index 0000000000..d267938080
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionInfo.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.localconfignode;
+
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+// This class is used for data partition maintaining the map between storage 
group and
+// dataRegionIds.
+public class LocalDataPartitionInfo {
+
+  // storageGroup -> LocalDataPartitionTable
+  private Map<PartialPath, LocalDataPartitionTable> partitionTableMap;
+
+  private static class LocalDataPartitionTableHolder {
+    private static final LocalDataPartitionInfo INSTANCE = new 
LocalDataPartitionInfo();
+
+    private LocalDataPartitionTableHolder() {}
+  }
+
+  private LocalDataPartitionInfo() {}
+
+  public static LocalDataPartitionInfo getInstance() {
+    return LocalDataPartitionTableHolder.INSTANCE;
+  }
+
+  public synchronized void init(ByteBuffer byteBuffer) throws 
IllegalPathException {
+    partitionTableMap = new ConcurrentHashMap<>();
+    // TODO: recover partition table from input stream
+  }
+
+  public synchronized void clear() {
+    if (partitionTableMap != null) {
+      partitionTableMap.clear();
+      partitionTableMap = null;
+    }
+  }
+
+  public DataRegionId getDataRegionId(
+      PartialPath storageGroup, PartialPath path, TTimePartitionSlot 
timePartitionSlot) {
+    if (!partitionTableMap.containsKey(storageGroup)) {
+      return null;
+    }
+    LocalDataPartitionTable table = partitionTableMap.get(storageGroup);
+    Map<TTimePartitionSlot, DataRegionId> slotRegionMap = new HashMap<>();
+    if (!table.getDataRegionId(path, 
Collections.singletonList(timePartitionSlot), slotRegionMap)) {
+      return null;
+    } else {
+      return slotRegionMap.get(timePartitionSlot);
+    }
+  }
+
+  /**
+   * Try to allocate a data region for the new time partition slot. This 
function will try to create
+   * new data region to make expansion if the existing data regions meet some 
condition.
+   *
+   * @param storageGroup The path for the storage group.
+   * @param path The full path for the series.
+   * @param timePartitionSlot The time partition slot to allocate.
+   * @return The data region id for the time partition slot.
+   */
+  public DataRegionId allocateDataRegionForNewSlot(
+      PartialPath storageGroup, PartialPath path, TTimePartitionSlot 
timePartitionSlot) {
+    LocalDataPartitionTable table = partitionTableMap.get(storageGroup);
+    return table.getDataRegionWithAutoExtension(path, timePartitionSlot);
+  }
+
+  public List<DataRegionId> getDataRegionIdsByStorageGroup(PartialPath 
storageGroup) {
+    if (partitionTableMap.containsKey(storageGroup)) {
+      LocalDataPartitionTable partitionTable = 
partitionTableMap.get(storageGroup);
+      return partitionTable.getAllDataRegionId();
+    }
+    return Collections.emptyList();
+  }
+
+  public synchronized void registerStorageGroup(PartialPath storageGroup) {
+    if (partitionTableMap.containsKey(storageGroup)) {
+      return;
+    }
+    partitionTableMap.put(storageGroup, new 
LocalDataPartitionTable(storageGroup.getFullPath()));
+  }
+
+  public synchronized void deleteStorageGroup(PartialPath storageGroup) {
+    LocalDataPartitionTable partitionTable = 
partitionTableMap.remove(storageGroup);
+    if (partitionTable != null) {
+      partitionTable.clear();
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionTable.java
 
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionTable.java
index 52662e3f9d..3a8af9c407 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionTable.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionTable.java
@@ -19,116 +19,163 @@
 
 package org.apache.iotdb.db.localconfignode;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.partition.SeriesPartitionTable;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.tsfile.utils.Pair;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 
-// This class is used for data partition maintaining the map between storage 
group and
-// dataRegionIds.
 public class LocalDataPartitionTable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LocalDataPartitionTable.class);
 
-  private AtomicInteger dataRegionIdGenerator;
-
-  private Map<PartialPath, List<DataRegionId>> table;
+  private String storageGroupName;
+  private Map<PartialPath, SeriesPartitionTable> partitionTableMap;
+  private List<DataRegionId> regionList = new ArrayList<>();
+  private Map<DataRegionId, AtomicInteger> regionSlotCountMap = new 
ConcurrentHashMap<>();
 
-  private static class LocalDataPartitionTableHolder {
-    private static final LocalDataPartitionTable INSTANCE = new 
LocalDataPartitionTable();
+  public LocalDataPartitionTable() {}
 
-    private LocalDataPartitionTableHolder() {};
+  public LocalDataPartitionTable(String storageGroupName) {
+    this.storageGroupName = storageGroupName;
+    this.partitionTableMap = new ConcurrentHashMap<>();
   }
 
-  private LocalDataPartitionTable() {}
-
-  public static LocalDataPartitionTable getInstance() {
-    return LocalDataPartitionTableHolder.INSTANCE;
+  public void init(ByteBuffer buffer) {
+    // TODO: init from byte buffer
   }
 
-  public synchronized void init(Map<String, List<DataRegionId>> 
recoveredLocalDataRegionInfo)
-      throws IllegalPathException {
-    table = new ConcurrentHashMap<>();
-    dataRegionIdGenerator = new AtomicInteger(0);
-    for (Map.Entry<String, List<DataRegionId>> entry : 
recoveredLocalDataRegionInfo.entrySet()) {
-      String storageGroup = entry.getKey();
-      List<DataRegionId> dataRegionIdList = new CopyOnWriteArrayList<>();
-      table.put(new PartialPath(storageGroup), dataRegionIdList);
-      for (DataRegionId dataRegionId : 
recoveredLocalDataRegionInfo.get(storageGroup)) {
-        dataRegionIdList.add(dataRegionId);
-
-        if (dataRegionId.getId() >= dataRegionIdGenerator.get()) {
-          dataRegionIdGenerator.set(dataRegionId.getId() + 1);
-        }
-      }
-    }
+  public void serialize(OutputStream outputStream) {
+    // TODO: serialize the table to output stream
   }
 
-  public synchronized void clear() {
-    if (table != null) {
-      table.clear();
-      table = null;
+  /**
+   * Get the TimePartitionSlot to DataRegionId Map. The result is stored in 
param slotRegionMap.
+   *
+   * @param path The full path for the series.
+   * @param timePartitionSlots The time partition slots for the series.
+   * @param slotRegionMap The map that store the result.
+   * @return Whether all the partitions exist.
+   */
+  public boolean getDataRegionId(
+      PartialPath path,
+      List<TTimePartitionSlot> timePartitionSlots,
+      Map<TTimePartitionSlot, DataRegionId> slotRegionMap) {
+    if (!partitionTableMap.containsKey(path)) {
+      return false;
     }
-
-    if (dataRegionIdGenerator != null) {
-      dataRegionIdGenerator = null;
+    SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable();
+    boolean allPartitionExists =
+        partitionTableMap.get(path).getDataPartition(timePartitionSlots, 
seriesPartitionTable);
+    for (Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> entry :
+        seriesPartitionTable.getSeriesPartitionMap().entrySet()) {
+      if (entry.getValue().size() > 0) {
+        slotRegionMap.put(entry.getKey(), new 
DataRegionId(entry.getValue().get(0).getId()));
+      }
     }
+    return allPartitionExists;
   }
 
-  public synchronized void putDataRegionId(PartialPath storageGroup, 
DataRegionId dataRegionId) {
-    table.get(storageGroup).add(dataRegionId);
-  }
-
-  public synchronized void removeDataRegionId(PartialPath storageGroup, 
DataRegionId dataRegionId) {
-    table.get(storageGroup).remove(dataRegionId);
+  /**
+   * Get all data region id of current storage group
+   *
+   * @return data region id in list
+   */
+  public List<DataRegionId> getAllDataRegionId() {
+    Set<DataRegionId> regionIdSet = new HashSet<>();
+    for (SeriesPartitionTable partitionTable : partitionTableMap.values()) {
+      Map<TTimePartitionSlot, List<TConsensusGroupId>> partitionGroupIdMap =
+          partitionTable.getSeriesPartitionMap();
+      for (List<TConsensusGroupId> ids : partitionGroupIdMap.values()) {
+        ids.forEach(x -> regionIdSet.add(new DataRegionId(x.getId())));
+      }
+    }
+    return new ArrayList<>(regionIdSet);
   }
 
-  public DataRegionId getDataRegionId(PartialPath storageGroup, PartialPath 
path) {
-    if (!table.containsKey(storageGroup)) {
-      return null;
+  public int getTimeSlotNum() {
+    int slotCount = 0;
+    for (SeriesPartitionTable partitionTable : partitionTableMap.values()) {
+      Map<TTimePartitionSlot, List<TConsensusGroupId>> partitionGroupIdMap =
+          partitionTable.getSeriesPartitionMap();
+      slotCount += partitionGroupIdMap.size();
     }
-    return table.get(storageGroup).get(0);
+    return slotCount;
   }
 
-  public List<DataRegionId> getInvolvedDataRegionIds(
-      PartialPath storageGroup, PartialPath pathPattern, boolean 
isPrefixMatch) {
-    List<DataRegionId> result = new ArrayList<>();
-    if (table.containsKey(storageGroup)) {
-      result.addAll(table.get(storageGroup));
+  public DataRegionId getDataRegionWithAutoExtension(
+      PartialPath path, TTimePartitionSlot timePartitionSlot) {
+    DataRegionId regionId;
+    if ((regionId = checkExpansion(path, timePartitionSlot)) != null) {
+      // return the newly allocated region
+      regionSlotCountMap.get(regionId).incrementAndGet();
+      return regionId;
     }
-    return result;
-  }
 
-  public List<DataRegionId> getDataRegionIdsByStorageGroup(PartialPath 
storageGroup) {
-    return table.getOrDefault(storageGroup, Collections.emptyList());
+    // select a region with min time partition slot
+    List<Pair<DataRegionId, AtomicInteger>> slotCountList = new ArrayList<>();
+    regionSlotCountMap.forEach((id, count) -> slotCountList.add(new Pair<>(id, 
count)));
+    slotCountList.sort(Comparator.comparingInt(o -> o.right.get()));
+    DataRegionId chosenId = slotCountList.get(0).left;
+    regionSlotCountMap.get(chosenId).incrementAndGet();
+    return chosenId;
   }
 
-  public synchronized void setDataPartitionInfo(PartialPath storageGroup) {
-    List<DataRegionId> dataRegionIdList;
-    if (table.containsKey(storageGroup)) {
-      dataRegionIdList = table.get(storageGroup);
-    } else {
-      dataRegionIdList = new CopyOnWriteArrayList<>();
+  public DataRegionId checkExpansion(PartialPath path, TTimePartitionSlot 
timePartitionSlot) {
+    if (regionList.size() == 0) {
+      // there is no data region for this storage group, create one
+      return carryOutExpansion(path, timePartitionSlot);
+    }
+
+    // check the load and carry out the expansion if necessary
+    double allocatedSlotNum = getTimeSlotNum();
+    double maxRegionNum = 
IoTDBDescriptor.getInstance().getConfig().getDataRegionNum();
+    double maxSlotNum = 
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum();
+    if (regionList.size() < maxRegionNum
+        && allocatedSlotNum / (double) regionList.size() > maxSlotNum / 
maxRegionNum) {
+      return carryOutExpansion(path, timePartitionSlot);
     }
-    dataRegionIdList.add(generateDataRegionId());
-    table.put(storageGroup, dataRegionIdList);
+    return null;
   }
 
-  public synchronized List<DataRegionId> deleteStorageGroup(PartialPath 
storageGroup) {
-    if (table.containsKey(storageGroup)) {
-      return table.remove(storageGroup);
+  private DataRegionId carryOutExpansion(PartialPath path, TTimePartitionSlot 
timePartitionSlot) {
+    if (!partitionTableMap.containsKey(path)) {
+      partitionTableMap.put(path, new SeriesPartitionTable());
     }
-    return Collections.emptyList();
+    SeriesPartitionTable seriesPartitionTable = partitionTableMap.get(path);
+    Map<TTimePartitionSlot, List<TConsensusGroupId>> allotmentMap = new 
HashMap<>();
+    int nextRegionId = DataRegionIdGenerator.getInstance().getNextId();
+    allotmentMap.put(
+        timePartitionSlot,
+        Collections.singletonList(
+            new TConsensusGroupId(TConsensusGroupType.DataRegion, 
nextRegionId)));
+    SeriesPartitionTable requestTable = new SeriesPartitionTable(allotmentMap);
+    Map<TConsensusGroupId, AtomicInteger> deltaMap = new HashMap<>();
+    seriesPartitionTable.createDataPartition(requestTable, deltaMap);
+    regionList.add(new DataRegionId(nextRegionId));
+    regionSlotCountMap.put(new DataRegionId(nextRegionId), new 
AtomicInteger(0));
+    return new DataRegionId(nextRegionId);
   }
 
-  // This method may be extended to implement multi dataRegion for one 
storageGroup
-  // todo keep consistent with the partition method of config node in new 
cluster
-  private DataRegionId generateDataRegionId() {
-    return new DataRegionId(dataRegionIdGenerator.getAndIncrement());
+  public void clear() {
+    // TODO: clear the table
   }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
index 34e2a9e85f..3afef548f5 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
@@ -71,6 +72,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
+import static 
org.apache.iotdb.db.localconfignode.LocalConfigNode.STANDALONE_MOCK_TIME_SLOT_START_TIME;
+
 public class StandaloneSchedulerTest {
   private static final IoTDBConfig conf = 
IoTDBDescriptor.getInstance().getConfig();
 
@@ -406,7 +409,8 @@ public class StandaloneSchedulerTest {
     fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new 
PartialPath(deviceId));
-    configNode.getBelongedDataRegionIdWithAutoCreate(new 
PartialPath(deviceId));
+    configNode.getBelongedDataRegionIdWithAutoCreate(
+        new PartialPath(deviceId), new 
TTimePartitionSlot(STANDALONE_MOCK_TIME_SLOT_START_TIME));
     MPPQueryContext context =
         new MPPQueryContext(
             "",
@@ -487,7 +491,8 @@ public class StandaloneSchedulerTest {
     fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(deviceId);
-    configNode.getBelongedDataRegionIdWithAutoCreate(deviceId);
+    configNode.getBelongedDataRegionIdWithAutoCreate(
+        deviceId, new 
TTimePartitionSlot(STANDALONE_MOCK_TIME_SLOT_START_TIME));
     MPPQueryContext context =
         new MPPQueryContext(
             "",

Reply via email to