This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/TableModelIngestion by this 
push:
     new c69f9704e6f adding DataRegionTest for insertRow
c69f9704e6f is described below

commit c69f9704e6f65e1be3b3d5767da7c1ab9a28a5ca
Author: Tian Jiang <[email protected]>
AuthorDate: Mon Jul 8 12:35:57 2024 +0800

    adding DataRegionTest for insertRow
---
 .../planner/plan/node/write/InsertRowNode.java     |  2 +-
 .../planner/plan/node/write/InsertRowsNode.java    |  3 +-
 .../plan/node/write/InsertRowsOfOneDeviceNode.java | 26 ++++++++---
 .../plan/relational/planner/RelationPlanner.java   |  1 +
 .../plan/statement/crud/InsertBaseStatement.java   | 13 ++++++
 .../plan/statement/crud/InsertRowStatement.java    |  9 ----
 .../plan/statement/crud/InsertTabletStatement.java | 10 -----
 .../dataregion/memtable/TsFileProcessor.java       | 29 ++++++------
 .../plan/relational/analyzer/AnalyzerTest.java     |  5 +--
 .../plan/statement/StatementTestUtils.java         | 24 ++++++++++
 .../storageengine/dataregion/DataRegionTest.java   | 52 ++++++++++++++++++++++
 .../iotdb/commons/partition/DataPartition.java     | 25 ++++++-----
 12 files changed, 142 insertions(+), 57 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
index 16138cafa2e..0635f30c80c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
@@ -113,7 +113,7 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue {
         analysis
             .getDataPartitionInfo()
             .getDataRegionReplicaSetForWriting(
-                devicePath.getIDeviceIDAsFullDevice(), timePartitionSlot);
+                getDeviceID(), timePartitionSlot, analysis.getDatabaseName());
     // collect redirectInfo
     analysis.setRedirectNodeList(
         Collections.singletonList(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 958d04c195c..820897565a0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -238,7 +238,8 @@ public class InsertRowsNode extends InsertNode implements 
WALEntryValue {
               .getDataPartitionInfo()
               .getDataRegionReplicaSetForWriting(
                   insertRowNode.devicePath.getIDeviceIDAsFullDevice(),
-                  
TimePartitionUtils.getTimePartitionSlot(insertRowNode.getTime()));
+                  
TimePartitionUtils.getTimePartitionSlot(insertRowNode.getTime()),
+                  analysis.getDatabaseName());
       // Collect redirectInfo
       
redirectInfo.add(dataRegionReplicaSet.getDataNodeLocations().get(0).getClientRpcEndPoint());
       InsertRowsNode tmpNode = splitMap.get(dataRegionReplicaSet);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 69c99373360..dcbe301f2b9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -65,10 +65,14 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
    */
   private List<Integer> insertRowNodeIndexList;
 
-  /** the InsertRowsNode list */
+  /**
+   * the InsertRowsNode list
+   */
   private List<InsertRowNode> insertRowNodeList;
 
-  /** record the result of insert rows */
+  /**
+   * record the result of insert rows
+   */
   private Map<Integer, TSStatus> results = new HashMap<>();
 
   public InsertRowsOfOneDeviceNode(PlanNodeId id) {
@@ -127,7 +131,8 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
   }
 
   @Override
-  public void addChild(PlanNode child) {}
+  public void addChild(PlanNode child) {
+  }
 
   @Override
   public PlanNodeType getType() {
@@ -165,7 +170,8 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
           analysis
               .getDataPartitionInfo()
               .getDataRegionReplicaSetForWriting(
-                  devicePath.getIDeviceIDAsFullDevice(), timePartitionSlot);
+                  devicePath.getIDeviceIDAsFullDevice(), timePartitionSlot,
+                  analysis.getDatabaseName());
       Map<TTimePartitionSlot, List<InsertRowNode>> tmpMap =
           splitMap.computeIfAbsent(dataRegionReplicaSet, k -> new HashMap<>());
       Map<TTimePartitionSlot, List<Integer>> tmpIndexMap =
@@ -302,9 +308,15 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-    if (!super.equals(o)) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
     InsertRowsOfOneDeviceNode that = (InsertRowsOfOneDeviceNode) o;
     return Objects.equals(insertRowNodeIndexList, that.insertRowNodeIndexList)
         && Objects.equals(insertRowNodeList, that.insertRowNodeList);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index 8c40135fd56..da60edf80f7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -213,6 +213,7 @@ public class RelationPlanner extends 
AstVisitor<RelationPlan, Void> {
     return new RelationPlan(insertNode, analysis.getRootScope(), 
Collections.emptyList());
   }
 
+  @Override
   protected RelationPlan visitInsertRow(InsertRow node, Void context) {
     InsertRowStatement insertRowStatement = node.getInnerTreeStatement();
     RelationalInsertRowNode insertNode =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index fcccc7df97b..e1eda31d7b5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -75,6 +75,7 @@ public abstract class InsertBaseStatement extends Statement {
   protected TsTableColumnCategory[] columnCategories;
   protected List<Integer> idColumnIndices;
   protected List<Integer> attrColumnIndices;
+  protected boolean writeToTable = false;
 
   // region params used by analyzing logical views.
 
@@ -480,5 +481,17 @@ public abstract class InsertBaseStatement extends 
Statement {
     CommonUtils.swapArray(columnCategories, src, target);
     idColumnIndices = null;
   }
+
+  public boolean isWriteToTable() {
+    return writeToTable;
+  }
+
+  public void setWriteToTable(boolean writeToTable) {
+    this.writeToTable = writeToTable;
+    if (writeToTable) {
+      isAligned = true;
+    }
+  }
+
   // endregion
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
index 29579f8e11e..be8c38d1107 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
@@ -76,7 +76,6 @@ public class InsertRowStatement extends InsertBaseStatement 
implements ISchemaVa
    */
   private boolean[] measurementIsAligned;
 
-  private boolean isWriteToTable = false;
   private IDeviceID deviceID;
 
   public InsertRowStatement() {
@@ -452,14 +451,6 @@ public class InsertRowStatement extends 
InsertBaseStatement implements ISchemaVa
         this.recordedBeginOfLogicalViewSchemaList, 
this.recordedEndOfLogicalViewSchemaList);
   }
 
-  public boolean isWriteToTable() {
-    return isWriteToTable;
-  }
-
-  public void setWriteToTable(boolean writeToTable) {
-    isWriteToTable = writeToTable;
-  }
-
   public IDeviceID getTableDeviceID() {
     if (deviceID == null) {
       String[] deviceIdSegments = new String[getIdColumnIndices().size() + 1];
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index 5ad2c48cf14..dff1e992c9f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -79,8 +79,6 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
    */
   private boolean[] measurementIsAligned;
 
-  private boolean writeToTable = false;
-
   public InsertTabletStatement() {
     super();
     statementType = StatementType.BATCH_INSERT;
@@ -441,14 +439,6 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
         this.recordedBeginOfLogicalViewSchemaList, 
this.recordedEndOfLogicalViewSchemaList);
   }
 
-  public boolean isWriteToTable() {
-    return writeToTable;
-  }
-
-  public void setWriteToTable(boolean writeToTable) {
-    this.writeToTable = writeToTable;
-  }
-
   @Override
   public Statement toRelationalStatement(MPPQueryContext context) {
     return new InsertTablet(this, context);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 514f99c6163..c16f03424a6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -250,14 +250,7 @@ public class TsFileProcessor {
     logger.info("reopen a tsfile processor {}", tsFileResource.getTsFile());
   }
 
-  /**
-   * Insert data in an InsertRowNode into the workingMemtable.
-   *
-   * @param insertRowNode physical plan of insertion
-   */
-  public void insert(InsertRowNode insertRowNode, long[] costsForMetrics)
-      throws WriteProcessException {
-
+  private void ensureMemTable(long[] costsForMetrics) {
     if (workMemTable == null) {
       long startTime = System.nanoTime();
       createNewWorkingMemTable();
@@ -267,6 +260,17 @@ public class TsFileProcessor {
           
.recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
     }
 
+  }
+
+  /**
+   * Insert data in an InsertRowNode into the workingMemtable.
+   *
+   * @param insertRowNode physical plan of insertion
+   */
+  public void insert(InsertRowNode insertRowNode, long[] costsForMetrics)
+      throws WriteProcessException {
+
+    ensureMemTable(costsForMetrics);
     long[] memIncrements;
 
     long memControlStartTime = System.nanoTime();
@@ -338,14 +342,7 @@ public class TsFileProcessor {
   public void insert(InsertRowsNode insertRowsNode, long[] costsForMetrics)
       throws WriteProcessException {
 
-    if (workMemTable == null) {
-      long startTime = System.nanoTime();
-      createNewWorkingMemTable();
-      // recordCreateMemtableBlockCost
-      costsForMetrics[0] += System.nanoTime() - startTime;
-      WritingMetrics.getInstance()
-          
.recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
-    }
+    ensureMemTable(costsForMetrics);
 
     long[] memIncrements;
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
index eccd095b97f..292586672a3 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
@@ -693,9 +693,8 @@ public class AnalyzerTest {
       @Override
       public DataPartition getOrCreateDataPartition(
           List<DataPartitionQueryParam> dataPartitionQueryParams, String 
userName) {
-        int seriesSlotNum = 1000;
-        String partitionExecutorName =
-            
"org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor";
+        int seriesSlotNum = StatementTestUtils.TEST_SERIES_SLOT_NUM;
+        String partitionExecutorName = 
StatementTestUtils.TEST_PARTITION_EXECUTOR;
         SeriesPartitionExecutor seriesPartitionExecutor =
             SeriesPartitionExecutor.getSeriesPartitionExecutor(
                 partitionExecutorName, seriesSlotNum);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
index 8e27e3e8cab..de63b0baf24 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.commons.schema.table.column.IdColumnSchema;
 import org.apache.iotdb.commons.schema.table.column.MeasurementColumnSchema;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
@@ -43,6 +44,9 @@ import java.util.List;
 
 public class StatementTestUtils {
 
+  public static final String TEST_PARTITION_EXECUTOR = 
"org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor";
+  public static final int TEST_SERIES_SLOT_NUM = 1000;
+
   private StatementTestUtils() {
     // util class
   }
@@ -190,6 +194,26 @@ public class StatementTestUtils {
         columnCategories);
   }
 
+  public static RelationalInsertRowNode genInsertRowNode(int offset) {
+    String[] measurements = genColumnNames();
+    TSDataType[] dataTypes = genDataTypes();
+    TsTableColumnCategory[] columnCategories = genColumnCategories();
+
+    Object[] values = genValues(offset);
+    long timestamp = genTimestamps(1, offset)[0];
+
+    return new RelationalInsertRowNode(
+        new PlanNodeId(offset + "-" + 1),
+        new PartialPath(new String[] {tableName()}),
+        true,
+        measurements,
+        dataTypes,
+        timestamp,
+        values,
+        false,
+        columnCategories);
+  }
+
 
   public static InsertTabletStatement genInsertTabletStatement(boolean 
writeToTable) {
     return genInsertTabletStatement(writeToTable, 3, 0);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 89a92968bcb..6937588de56 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -43,6 +43,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils;
 import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
@@ -89,6 +90,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertRowNode;
+import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertRowStatement;
 import static 
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertTabletNode;
 
 public class DataRegionTest {
@@ -308,6 +311,55 @@ public class DataRegionTest {
     }
   }
 
+  @Test
+  public void testRelationRowWriteAndSyncClose()
+      throws QueryProcessException, WriteProcessException {
+    RelationalInsertRowNode insertNode1 = genInsertRowNode( 0);
+    dataRegion.insert(insertNode1);
+    dataRegion.asyncCloseAllWorkingTsFileProcessors();
+
+    RelationalInsertRowNode insertRowNode2 = genInsertRowNode(10);
+    dataRegion.insert(insertRowNode2);
+    dataRegion.asyncCloseAllWorkingTsFileProcessors();
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+    String measurementName = "m1";
+    MeasurementSchema measurementSchema = new 
MeasurementSchema(measurementName, TSDataType.DOUBLE);
+    final IDeviceID deviceID1 = insertNode1.getDeviceID();
+    final IDeviceID deviceID2 = insertRowNode2.getDeviceID();
+
+    QueryDataSource queryDataSource =
+        dataRegion.query(
+            Collections.singletonList(
+                new AlignedFullPath(
+                    deviceID1,
+                    Collections.singletonList(measurementName),
+                    Collections.singletonList(measurementSchema))),
+            deviceID1,
+            context,
+            null,
+            null);
+    Assert.assertEquals(1, queryDataSource.getSeqResources().size());
+    Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+
+    queryDataSource =
+        dataRegion.query(
+            Collections.singletonList(
+                new AlignedFullPath(
+                    deviceID2,
+                    Collections.singletonList(measurementName),
+                    Collections.singletonList(measurementSchema))),
+            deviceID2,
+            context,
+            null,
+            null);
+    Assert.assertEquals(1, queryDataSource.getSeqResources().size());
+    Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+    for (TsFileResource resource : queryDataSource.getSeqResources()) {
+      Assert.assertTrue(resource.isClosed());
+    }
+  }
+
   @Test
   public void testIoTDBTabletWriteAndSyncClose()
       throws QueryProcessException, IllegalPathException, 
WriteProcessException {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 03a37dc5d3c..70cb739f26c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -66,7 +66,7 @@ public class DataPartition extends Partition {
   }
 
   public Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>>>
-      getDataPartitionMap() {
+  getDataPartitionMap() {
     return dataPartitionMap;
   }
 
@@ -78,7 +78,7 @@ public class DataPartition extends Partition {
 
   public List<List<TTimePartitionSlot>> getTimePartitionRange(
       IDeviceID deviceID, Filter timeFilter) {
-    String storageGroup = getStorageGroupByDevice(deviceID);
+    String storageGroup = getDatabaseNameByDevice(deviceID);
     TSeriesPartitionSlot seriesPartitionSlot = 
calculateDeviceGroupId(deviceID);
     if (!dataPartitionMap.containsKey(storageGroup)
         || 
!dataPartitionMap.get(storageGroup).containsKey(seriesPartitionSlot)) {
@@ -121,7 +121,7 @@ public class DataPartition extends Partition {
 
   public List<TRegionReplicaSet> getDataRegionReplicaSetWithTimeFilter(
       IDeviceID deviceId, Filter timeFilter) {
-    String storageGroup = getStorageGroupByDevice(deviceId);
+    String storageGroup = getDatabaseNameByDevice(deviceId);
     TSeriesPartitionSlot seriesPartitionSlot = 
calculateDeviceGroupId(deviceId);
     if (!dataPartitionMap.containsKey(storageGroup)
         || 
!dataPartitionMap.get(storageGroup).containsKey(seriesPartitionSlot)) {
@@ -151,7 +151,7 @@ public class DataPartition extends Partition {
 
   public List<TRegionReplicaSet> getDataRegionReplicaSet(
       IDeviceID deviceID, TTimePartitionSlot tTimePartitionSlot) {
-    String storageGroup = getStorageGroupByDevice(deviceID);
+    String storageGroup = getDatabaseNameByDevice(deviceID);
     Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>> dbMap =
         dataPartitionMap.get(storageGroup);
     if (dbMap == null) {
@@ -175,7 +175,7 @@ public class DataPartition extends Partition {
   public List<TRegionReplicaSet> getDataRegionReplicaSetForWriting(
       IDeviceID deviceID, List<TTimePartitionSlot> timePartitionSlotList, 
String databaseName) {
     if (databaseName == null) {
-      databaseName = getStorageGroupByDevice(deviceID);
+      databaseName = getDatabaseNameByDevice(deviceID);
     }
     // A list of data region replica sets will store data in a same time 
partition.
     // We will insert data to the last set in the list.
@@ -204,14 +204,13 @@ public class DataPartition extends Partition {
   }
 
   public TRegionReplicaSet getDataRegionReplicaSetForWriting(
-      IDeviceID deviceID, TTimePartitionSlot timePartitionSlot) {
+      IDeviceID deviceID, TTimePartitionSlot timePartitionSlot, String 
databaseName) {
     // A list of data region replica sets will store data in a same time 
partition.
     // We will insert data to the last set in the list.
     // TODO return the latest dataRegionReplicaSet for each time partition
-    String storageGroup = getStorageGroupByDevice(deviceID);
     TSeriesPartitionSlot seriesPartitionSlot = 
calculateDeviceGroupId(deviceID);
     Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
-        databasePartitionMap = dataPartitionMap.get(storageGroup);
+        databasePartitionMap = dataPartitionMap.get(databaseName);
     if (databasePartitionMap == null) {
       throw new RuntimeException(
           "Database not exists and failed to create automatically because 
enable_auto_create_schema is FALSE.");
@@ -223,7 +222,13 @@ public class DataPartition extends Partition {
     return regions.get(0);
   }
 
-  private String getStorageGroupByDevice(IDeviceID deviceID) {
+  public TRegionReplicaSet getDataRegionReplicaSetForWriting(
+      IDeviceID deviceID, TTimePartitionSlot timePartitionSlot) {
+    return getDataRegionReplicaSetForWriting(deviceID, timePartitionSlot,
+        getDatabaseNameByDevice(deviceID));
+  }
+
+  private String getDatabaseNameByDevice(IDeviceID deviceID) {
     for (String storageGroup : dataPartitionMap.keySet()) {
       if (PathUtils.isStartWith(deviceID, storageGroup)) {
         return storageGroup;
@@ -257,7 +262,7 @@ public class DataPartition extends Partition {
     requireNonNull(this.dataPartitionMap, "dataPartitionMap is null");
 
     for (Map.Entry<
-            String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>>>
+        String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>>>
         dbEntry : targetDataPartition.getDataPartitionMap().entrySet()) {
       String database = dbEntry.getKey();
       if (dataPartitionMap.containsKey(database)) {

Reply via email to