This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableModelIngestion by this
push:
new c69f9704e6f adding DataRegionTest for insertRow
c69f9704e6f is described below
commit c69f9704e6f65e1be3b3d5767da7c1ab9a28a5ca
Author: Tian Jiang <[email protected]>
AuthorDate: Mon Jul 8 12:35:57 2024 +0800
adding DataRegionTest for insertRow
---
.../planner/plan/node/write/InsertRowNode.java | 2 +-
.../planner/plan/node/write/InsertRowsNode.java | 3 +-
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 26 ++++++++---
.../plan/relational/planner/RelationPlanner.java | 1 +
.../plan/statement/crud/InsertBaseStatement.java | 13 ++++++
.../plan/statement/crud/InsertRowStatement.java | 9 ----
.../plan/statement/crud/InsertTabletStatement.java | 10 -----
.../dataregion/memtable/TsFileProcessor.java | 29 ++++++------
.../plan/relational/analyzer/AnalyzerTest.java | 5 +--
.../plan/statement/StatementTestUtils.java | 24 ++++++++++
.../storageengine/dataregion/DataRegionTest.java | 52 ++++++++++++++++++++++
.../iotdb/commons/partition/DataPartition.java | 25 ++++++-----
12 files changed, 142 insertions(+), 57 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
index 16138cafa2e..0635f30c80c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
@@ -113,7 +113,7 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue {
analysis
.getDataPartitionInfo()
.getDataRegionReplicaSetForWriting(
- devicePath.getIDeviceIDAsFullDevice(), timePartitionSlot);
+ getDeviceID(), timePartitionSlot, analysis.getDatabaseName());
// collect redirectInfo
analysis.setRedirectNodeList(
Collections.singletonList(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 958d04c195c..820897565a0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -238,7 +238,8 @@ public class InsertRowsNode extends InsertNode implements
WALEntryValue {
.getDataPartitionInfo()
.getDataRegionReplicaSetForWriting(
insertRowNode.devicePath.getIDeviceIDAsFullDevice(),
-
TimePartitionUtils.getTimePartitionSlot(insertRowNode.getTime()));
+
TimePartitionUtils.getTimePartitionSlot(insertRowNode.getTime()),
+ analysis.getDatabaseName());
// Collect redirectInfo
redirectInfo.add(dataRegionReplicaSet.getDataNodeLocations().get(0).getClientRpcEndPoint());
InsertRowsNode tmpNode = splitMap.get(dataRegionReplicaSet);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 69c99373360..dcbe301f2b9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -65,10 +65,14 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
*/
private List<Integer> insertRowNodeIndexList;
- /** the InsertRowsNode list */
+ /**
+ * the InsertRowsNode list
+ */
private List<InsertRowNode> insertRowNodeList;
- /** record the result of insert rows */
+ /**
+ * record the result of insert rows
+ */
private Map<Integer, TSStatus> results = new HashMap<>();
public InsertRowsOfOneDeviceNode(PlanNodeId id) {
@@ -127,7 +131,8 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
}
@Override
- public void addChild(PlanNode child) {}
+ public void addChild(PlanNode child) {
+ }
@Override
public PlanNodeType getType() {
@@ -165,7 +170,8 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
analysis
.getDataPartitionInfo()
.getDataRegionReplicaSetForWriting(
- devicePath.getIDeviceIDAsFullDevice(), timePartitionSlot);
+ devicePath.getIDeviceIDAsFullDevice(), timePartitionSlot,
+ analysis.getDatabaseName());
Map<TTimePartitionSlot, List<InsertRowNode>> tmpMap =
splitMap.computeIfAbsent(dataRegionReplicaSet, k -> new HashMap<>());
Map<TTimePartitionSlot, List<Integer>> tmpIndexMap =
@@ -302,9 +308,15 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
InsertRowsOfOneDeviceNode that = (InsertRowsOfOneDeviceNode) o;
return Objects.equals(insertRowNodeIndexList, that.insertRowNodeIndexList)
&& Objects.equals(insertRowNodeList, that.insertRowNodeList);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index 8c40135fd56..da60edf80f7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -213,6 +213,7 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
return new RelationPlan(insertNode, analysis.getRootScope(),
Collections.emptyList());
}
+ @Override
protected RelationPlan visitInsertRow(InsertRow node, Void context) {
InsertRowStatement insertRowStatement = node.getInnerTreeStatement();
RelationalInsertRowNode insertNode =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index fcccc7df97b..e1eda31d7b5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -75,6 +75,7 @@ public abstract class InsertBaseStatement extends Statement {
protected TsTableColumnCategory[] columnCategories;
protected List<Integer> idColumnIndices;
protected List<Integer> attrColumnIndices;
+ protected boolean writeToTable = false;
// region params used by analyzing logical views.
@@ -480,5 +481,17 @@ public abstract class InsertBaseStatement extends
Statement {
CommonUtils.swapArray(columnCategories, src, target);
idColumnIndices = null;
}
+
+ public boolean isWriteToTable() {
+ return writeToTable;
+ }
+
+ public void setWriteToTable(boolean writeToTable) {
+ this.writeToTable = writeToTable;
+ if (writeToTable) {
+ isAligned = true;
+ }
+ }
+
// endregion
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
index 29579f8e11e..be8c38d1107 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
@@ -76,7 +76,6 @@ public class InsertRowStatement extends InsertBaseStatement
implements ISchemaVa
*/
private boolean[] measurementIsAligned;
- private boolean isWriteToTable = false;
private IDeviceID deviceID;
public InsertRowStatement() {
@@ -452,14 +451,6 @@ public class InsertRowStatement extends
InsertBaseStatement implements ISchemaVa
this.recordedBeginOfLogicalViewSchemaList,
this.recordedEndOfLogicalViewSchemaList);
}
- public boolean isWriteToTable() {
- return isWriteToTable;
- }
-
- public void setWriteToTable(boolean writeToTable) {
- isWriteToTable = writeToTable;
- }
-
public IDeviceID getTableDeviceID() {
if (deviceID == null) {
String[] deviceIdSegments = new String[getIdColumnIndices().size() + 1];
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index 5ad2c48cf14..dff1e992c9f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -79,8 +79,6 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
*/
private boolean[] measurementIsAligned;
- private boolean writeToTable = false;
-
public InsertTabletStatement() {
super();
statementType = StatementType.BATCH_INSERT;
@@ -441,14 +439,6 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
this.recordedBeginOfLogicalViewSchemaList,
this.recordedEndOfLogicalViewSchemaList);
}
- public boolean isWriteToTable() {
- return writeToTable;
- }
-
- public void setWriteToTable(boolean writeToTable) {
- this.writeToTable = writeToTable;
- }
-
@Override
public Statement toRelationalStatement(MPPQueryContext context) {
return new InsertTablet(this, context);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 514f99c6163..c16f03424a6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -250,14 +250,7 @@ public class TsFileProcessor {
logger.info("reopen a tsfile processor {}", tsFileResource.getTsFile());
}
- /**
- * Insert data in an InsertRowNode into the workingMemtable.
- *
- * @param insertRowNode physical plan of insertion
- */
- public void insert(InsertRowNode insertRowNode, long[] costsForMetrics)
- throws WriteProcessException {
-
+ private void ensureMemTable(long[] costsForMetrics) {
if (workMemTable == null) {
long startTime = System.nanoTime();
createNewWorkingMemTable();
@@ -267,6 +260,17 @@ public class TsFileProcessor {
.recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
}
+ }
+
+ /**
+ * Insert data in an InsertRowNode into the workingMemtable.
+ *
+ * @param insertRowNode physical plan of insertion
+ */
+ public void insert(InsertRowNode insertRowNode, long[] costsForMetrics)
+ throws WriteProcessException {
+
+ ensureMemTable(costsForMetrics);
long[] memIncrements;
long memControlStartTime = System.nanoTime();
@@ -338,14 +342,7 @@ public class TsFileProcessor {
public void insert(InsertRowsNode insertRowsNode, long[] costsForMetrics)
throws WriteProcessException {
- if (workMemTable == null) {
- long startTime = System.nanoTime();
- createNewWorkingMemTable();
- // recordCreateMemtableBlockCost
- costsForMetrics[0] += System.nanoTime() - startTime;
- WritingMetrics.getInstance()
-
.recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
- }
+ ensureMemTable(costsForMetrics);
long[] memIncrements;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
index eccd095b97f..292586672a3 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
@@ -693,9 +693,8 @@ public class AnalyzerTest {
@Override
public DataPartition getOrCreateDataPartition(
List<DataPartitionQueryParam> dataPartitionQueryParams, String
userName) {
- int seriesSlotNum = 1000;
- String partitionExecutorName =
-
"org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor";
+ int seriesSlotNum = StatementTestUtils.TEST_SERIES_SLOT_NUM;
+ String partitionExecutorName =
StatementTestUtils.TEST_PARTITION_EXECUTOR;
SeriesPartitionExecutor seriesPartitionExecutor =
SeriesPartitionExecutor.getSeriesPartitionExecutor(
partitionExecutorName, seriesSlotNum);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
index 8e27e3e8cab..de63b0baf24 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.commons.schema.table.column.IdColumnSchema;
import org.apache.iotdb.commons.schema.table.column.MeasurementColumnSchema;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
@@ -43,6 +44,9 @@ import java.util.List;
public class StatementTestUtils {
+ public static final String TEST_PARTITION_EXECUTOR =
"org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor";
+ public static final int TEST_SERIES_SLOT_NUM = 1000;
+
private StatementTestUtils() {
// util class
}
@@ -190,6 +194,26 @@ public class StatementTestUtils {
columnCategories);
}
+ public static RelationalInsertRowNode genInsertRowNode(int offset) {
+ String[] measurements = genColumnNames();
+ TSDataType[] dataTypes = genDataTypes();
+ TsTableColumnCategory[] columnCategories = genColumnCategories();
+
+ Object[] values = genValues(offset);
+ long timestamp = genTimestamps(1, offset)[0];
+
+ return new RelationalInsertRowNode(
+ new PlanNodeId(offset + "-" + 1),
+ new PartialPath(new String[] {tableName()}),
+ true,
+ measurements,
+ dataTypes,
+ timestamp,
+ values,
+ false,
+ columnCategories);
+ }
+
public static InsertTabletStatement genInsertTabletStatement(boolean
writeToTable) {
return genInsertTabletStatement(writeToTable, 3, 0);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 89a92968bcb..6937588de56 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -43,6 +43,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils;
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
@@ -89,6 +90,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import static
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertRowNode;
+import static
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertRowStatement;
import static
org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertTabletNode;
public class DataRegionTest {
@@ -308,6 +311,55 @@ public class DataRegionTest {
}
}
+ @Test
+ public void testRelationRowWriteAndSyncClose()
+ throws QueryProcessException, WriteProcessException {
+ RelationalInsertRowNode insertNode1 = genInsertRowNode( 0);
+ dataRegion.insert(insertNode1);
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+
+ RelationalInsertRowNode insertRowNode2 = genInsertRowNode(10);
+ dataRegion.insert(insertRowNode2);
+ dataRegion.asyncCloseAllWorkingTsFileProcessors();
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ String measurementName = "m1";
+ MeasurementSchema measurementSchema = new
MeasurementSchema(measurementName, TSDataType.DOUBLE);
+ final IDeviceID deviceID1 = insertNode1.getDeviceID();
+ final IDeviceID deviceID2 = insertRowNode2.getDeviceID();
+
+ QueryDataSource queryDataSource =
+ dataRegion.query(
+ Collections.singletonList(
+ new AlignedFullPath(
+ deviceID1,
+ Collections.singletonList(measurementName),
+ Collections.singletonList(measurementSchema))),
+ deviceID1,
+ context,
+ null,
+ null);
+ Assert.assertEquals(1, queryDataSource.getSeqResources().size());
+ Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+
+ queryDataSource =
+ dataRegion.query(
+ Collections.singletonList(
+ new AlignedFullPath(
+ deviceID2,
+ Collections.singletonList(measurementName),
+ Collections.singletonList(measurementSchema))),
+ deviceID2,
+ context,
+ null,
+ null);
+ Assert.assertEquals(1, queryDataSource.getSeqResources().size());
+ Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+ for (TsFileResource resource : queryDataSource.getSeqResources()) {
+ Assert.assertTrue(resource.isClosed());
+ }
+ }
+
@Test
public void testIoTDBTabletWriteAndSyncClose()
throws QueryProcessException, IllegalPathException,
WriteProcessException {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 03a37dc5d3c..70cb739f26c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -66,7 +66,7 @@ public class DataPartition extends Partition {
}
public Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>>>
- getDataPartitionMap() {
+ getDataPartitionMap() {
return dataPartitionMap;
}
@@ -78,7 +78,7 @@ public class DataPartition extends Partition {
public List<List<TTimePartitionSlot>> getTimePartitionRange(
IDeviceID deviceID, Filter timeFilter) {
- String storageGroup = getStorageGroupByDevice(deviceID);
+ String storageGroup = getDatabaseNameByDevice(deviceID);
TSeriesPartitionSlot seriesPartitionSlot =
calculateDeviceGroupId(deviceID);
if (!dataPartitionMap.containsKey(storageGroup)
||
!dataPartitionMap.get(storageGroup).containsKey(seriesPartitionSlot)) {
@@ -121,7 +121,7 @@ public class DataPartition extends Partition {
public List<TRegionReplicaSet> getDataRegionReplicaSetWithTimeFilter(
IDeviceID deviceId, Filter timeFilter) {
- String storageGroup = getStorageGroupByDevice(deviceId);
+ String storageGroup = getDatabaseNameByDevice(deviceId);
TSeriesPartitionSlot seriesPartitionSlot =
calculateDeviceGroupId(deviceId);
if (!dataPartitionMap.containsKey(storageGroup)
||
!dataPartitionMap.get(storageGroup).containsKey(seriesPartitionSlot)) {
@@ -151,7 +151,7 @@ public class DataPartition extends Partition {
public List<TRegionReplicaSet> getDataRegionReplicaSet(
IDeviceID deviceID, TTimePartitionSlot tTimePartitionSlot) {
- String storageGroup = getStorageGroupByDevice(deviceID);
+ String storageGroup = getDatabaseNameByDevice(deviceID);
Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>> dbMap =
dataPartitionMap.get(storageGroup);
if (dbMap == null) {
@@ -175,7 +175,7 @@ public class DataPartition extends Partition {
public List<TRegionReplicaSet> getDataRegionReplicaSetForWriting(
IDeviceID deviceID, List<TTimePartitionSlot> timePartitionSlotList,
String databaseName) {
if (databaseName == null) {
- databaseName = getStorageGroupByDevice(deviceID);
+ databaseName = getDatabaseNameByDevice(deviceID);
}
// A list of data region replica sets will store data in a same time
partition.
// We will insert data to the last set in the list.
@@ -204,14 +204,13 @@ public class DataPartition extends Partition {
}
public TRegionReplicaSet getDataRegionReplicaSetForWriting(
- IDeviceID deviceID, TTimePartitionSlot timePartitionSlot) {
+ IDeviceID deviceID, TTimePartitionSlot timePartitionSlot, String
databaseName) {
// A list of data region replica sets will store data in a same time
partition.
// We will insert data to the last set in the list.
// TODO return the latest dataRegionReplicaSet for each time partition
- String storageGroup = getStorageGroupByDevice(deviceID);
TSeriesPartitionSlot seriesPartitionSlot =
calculateDeviceGroupId(deviceID);
Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
- databasePartitionMap = dataPartitionMap.get(storageGroup);
+ databasePartitionMap = dataPartitionMap.get(databaseName);
if (databasePartitionMap == null) {
throw new RuntimeException(
"Database not exists and failed to create automatically because
enable_auto_create_schema is FALSE.");
@@ -223,7 +222,13 @@ public class DataPartition extends Partition {
return regions.get(0);
}
- private String getStorageGroupByDevice(IDeviceID deviceID) {
+ public TRegionReplicaSet getDataRegionReplicaSetForWriting(
+ IDeviceID deviceID, TTimePartitionSlot timePartitionSlot) {
+ return getDataRegionReplicaSetForWriting(deviceID, timePartitionSlot,
+ getDatabaseNameByDevice(deviceID));
+ }
+
+ private String getDatabaseNameByDevice(IDeviceID deviceID) {
for (String storageGroup : dataPartitionMap.keySet()) {
if (PathUtils.isStartWith(deviceID, storageGroup)) {
return storageGroup;
@@ -257,7 +262,7 @@ public class DataPartition extends Partition {
requireNonNull(this.dataPartitionMap, "dataPartitionMap is null");
for (Map.Entry<
- String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>>>
+ String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>>>
dbEntry : targetDataPartition.getDataPartitionMap().entrySet()) {
String database = dbEntry.getKey();
if (dataPartitionMap.containsKey(database)) {