This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/write_instance_parallel in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b8a351c26ad4f3793343c0f2f809e4898e40261d Author: Jinrui.Zhang <[email protected]> AuthorDate: Wed Apr 13 22:43:35 2022 +0800 add UT for DistributedPlan Write --- .../iotdb/commons/partition/DataPartition.java | 11 ++++- .../planner/plan/WriteFragmentParallelPlanner.java | 2 +- .../db/mpp/sql/plan/DistributionPlannerTest.java | 57 ++++++++++++++++++++-- 3 files changed, 62 insertions(+), 8 deletions(-) diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java index 3f677ce2c9..f352d7cdeb 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java @@ -68,7 +68,10 @@ public class DataPartition { // A list of data region replica sets will store data in a same time partition. // We will insert data to the last set in the list. // TODO return the latest dataRegionReplicaSet for each time partition - return Collections.emptyList(); + String storageGroup = getStorageGroupByDevice(deviceName); + SeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName); + // IMPORTANT TODO: (xingtanzjr) need to handle the situation for write operation that there are more than 1 Regions for one timeSlot + return dataPartitionMap.get(storageGroup).get(seriesPartitionSlot).entrySet().stream().filter(entry -> timePartitionSlotList.contains(entry.getKey())).flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList()); } public RegionReplicaSet getDataRegionReplicaSetForWriting( @@ -76,7 +79,11 @@ public class DataPartition { // A list of data region replica sets will store data in a same time partition. // We will insert data to the last set in the list. // TODO return the latest dataRegionReplicaSet for each time partition - return null; + String storageGroup = getStorageGroupByDevice(deviceName); + SeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName); + List<RegionReplicaSet> regions = dataPartitionMap.get(storageGroup).get(seriesPartitionSlot).entrySet().stream().filter(entry -> entry.getKey().equals(timePartitionSlot)).flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList()); + // IMPORTANT TODO: (xingtanzjr) need to handle the situation for write operation that there are more than 1 Regions for one timeSlot + return regions.get(0); } private SeriesPartitionSlot calculateDeviceGroupId(String deviceName) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java index d8c9435214..a31ac6f25f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java @@ -62,7 +62,7 @@ public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner { fragment.getId().genFragmentInstanceId(), timeFilter, queryContext.getQueryType()); - instance.setDataRegionAndHost(((IWritePlanNode) node).getRegionReplicaSet()); + instance.setDataRegionAndHost(split.getRegionReplicaSet()); ret.add(instance); } return ret; diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java index 39d9c6a71d..b4b77956f7 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java @@ -47,6 +47,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode; +import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; @@ -257,7 +258,7 @@ public class DistributionPlannerTest { } @Test - public void TestWriteParallelPlan() throws IllegalPathException { + public void TestInsertRowNodeParallelPlan() throws IllegalPathException { QueryId queryId = new QueryId("test_write"); InsertRowNode insertRowNode = new InsertRowNode( @@ -282,6 +283,48 @@ public class DistributionPlannerTest { assertEquals(1, plan.getInstances().size()); } + @Test + public void TestInsertRowsNodeParallelPlan() throws IllegalPathException { + QueryId queryId = new QueryId("test_write"); + InsertRowNode insertRowNode1 = + new InsertRowNode( + queryId.genPlanNodeId(), + new PartialPath("root.sg.d1"), + false, + new MeasurementSchema[] { + new MeasurementSchema("s1", TSDataType.INT32), + }, + new TSDataType[] {TSDataType.INT32}, + 1L, + new Object[] {10}); + + InsertRowNode insertRowNode2 = + new InsertRowNode( + queryId.genPlanNodeId(), + new PartialPath("root.sg.d1"), + false, + new MeasurementSchema[] { + new MeasurementSchema("s1", TSDataType.INT32), + }, + new TSDataType[] {TSDataType.INT32}, + 100000L, + new Object[] {10}); + + InsertRowsNode node = new InsertRowsNode(queryId.genPlanNodeId()); + node.setInsertRowNodeList(Arrays.asList(insertRowNode1, insertRowNode2)); + node.setInsertRowNodeIndexList(Arrays.asList(0, 1)); + + Analysis analysis = constructAnalysis(); + + MPPQueryContext context = + new MPPQueryContext("", queryId, null, QueryType.WRITE, new Endpoint()); + DistributionPlanner planner = + new DistributionPlanner(analysis, new LogicalQueryPlan(context, node)); + DistributedQueryPlan plan = planner.planFragments(); + plan.getInstances().forEach(System.out::println); + assertEquals(1, plan.getInstances().size()); + } + private Analysis constructAnalysis() { Analysis analysis = new Analysis(); @@ -295,21 +338,25 @@ public class DistributionPlannerTest { Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>> sgPartitionMap = new HashMap<>(); - List<RegionReplicaSet> d1DataRegions = new ArrayList<>(); - d1DataRegions.add( + List<RegionReplicaSet> d1DataRegion1 = new ArrayList<>(); + d1DataRegion1.add( new RegionReplicaSet( new DataRegionId(1), Arrays.asList( new DataNodeLocation(11, new Endpoint("192.0.1.1", 9000)), new DataNodeLocation(12, new Endpoint("192.0.1.2", 9000))))); - d1DataRegions.add( + + List<RegionReplicaSet> d1DataRegion2 = new ArrayList<>(); + d1DataRegion2.add( new RegionReplicaSet( new DataRegionId(2), Arrays.asList( new DataNodeLocation(21, new Endpoint("192.0.2.1", 9000)), new DataNodeLocation(22, new Endpoint("192.0.2.2", 9000))))); + Map<TimePartitionSlot, List<RegionReplicaSet>> d1DataRegionMap = new HashMap<>(); - d1DataRegionMap.put(new TimePartitionSlot(0), d1DataRegions); + d1DataRegionMap.put(new TimePartitionSlot(0), d1DataRegion1); + d1DataRegionMap.put(new TimePartitionSlot(1), d1DataRegion2); List<RegionReplicaSet> d2DataRegions = new ArrayList<>(); d2DataRegions.add(
