This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch Use-IDeviceId-for-partition-executor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b84b2df1565de4dec9492e54288a7dbfbbb46610 Author: YongzaoDan <[email protected]> AuthorDate: Fri Apr 26 15:24:07 2024 +0800 maybe finish --- .../confignode/it/utils/ConfigNodeTestUtils.java | 2 +- .../iotdb/confignode/conf/ConfigNodeConfig.java | 2 +- .../manager/partition/PartitionManager.java | 15 ++++++-- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../plan/analyze/ClusterPartitionFetcher.java | 7 +++- .../analyze/cache/partition/PartitionCache.java | 7 +++- .../plan/analyze/FakePartitionFetcherImpl.java | 5 ++- .../plan/analyze/cache/PartitionCacheTest.java | 11 ++++-- .../plan/planner/distribution/Util.java | 37 +++++++++++++----- .../plan/planner/distribution/Util2.java | 21 +++++++--- .../planner/node/write/WritePlanNodeSplitTest.java | 4 +- .../resources/conf/iotdb-common.properties | 7 +--- .../iotdb/commons/partition/DataPartition.java | 16 +++++--- .../apache/iotdb/commons/partition/Partition.java | 6 ++- .../iotdb/commons/partition/SchemaPartition.java | 5 ++- .../BKDRHashExecutor.java => HashExecutor.java} | 27 ++++++++----- .../executor/SeriesPartitionExecutor.java | 5 ++- .../partition/executor/hash/APHashExecutor.java | 45 ---------------------- .../partition/executor/hash/JSHashExecutor.java | 43 --------------------- .../partition/executor/hash/SDBMHashExecutor.java | 41 -------------------- .../partition/executor/HashExecutorTest.java} | 31 ++++++++++----- 21 files changed, 146 insertions(+), 193 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java index 7c6719e6701..ce0829ab3b9 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java @@ -319,7 +319,7 @@ public class ConfigNodeTestUtils { "org.apache.iotdb.consensus.simple.SimpleConsensus"); clusterParameters.setSeriesPartitionSlotNum(1000); clusterParameters.setSeriesPartitionExecutorClass( - "org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor"); + "org.apache.iotdb.commons.partition.executor.HashExecutor"); clusterParameters.setDefaultTTL(Long.MAX_VALUE); clusterParameters.setTimePartitionInterval(604800000); clusterParameters.setDataReplicationFactor(1); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index 3875bf09080..b4c2d18300e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -76,7 +76,7 @@ public class ConfigNodeConfig { /** SeriesPartitionSlot executor class. */ private String seriesPartitionExecutorClass = - "org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor"; + "org.apache.iotdb.commons.partition.executor.HashExecutor"; /** The policy of extension SchemaRegionGroup for each Database. */ private RegionGroupExtensionPolicy schemaRegionGroupExtensionPolicy = diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index 500a6d82660..5426db6d52e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -90,6 +90,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1001,7 +1002,7 @@ public class PartitionManager { * @return SeriesPartitionSlot */ public TSeriesPartitionSlot getSeriesPartitionSlot(String devicePath) { - return executor.getSeriesPartitionSlot(devicePath); + return executor.getSeriesPartitionSlot(StringArrayDeviceID.getFACTORY().create(devicePath)); } public RegionInfoListResp getRegionInfoList(GetRegionInfoListPlan req) { @@ -1077,7 +1078,9 @@ public class PartitionManager { plan.setDatabase(req.getDatabase()); } else { plan.setDatabase(getClusterSchemaManager().getDatabaseNameByDevice(req.getDevice())); - plan.setSeriesSlotId(executor.getSeriesPartitionSlot(req.getDevice())); + plan.setSeriesSlotId( + executor.getSeriesPartitionSlot( + StringArrayDeviceID.getFACTORY().create(req.getDevice()))); } if (Objects.equals(plan.getDatabase(), "")) { // Return empty result if Database not specified @@ -1112,7 +1115,9 @@ public class PartitionManager { plan.setDatabase(req.getDatabase()); } else if (req.isSetDevice()) { plan.setDatabase(getClusterSchemaManager().getDatabaseNameByDevice(req.getDevice())); - plan.setSeriesSlotId(executor.getSeriesPartitionSlot(req.getDevice())); + plan.setSeriesSlotId( + executor.getSeriesPartitionSlot( + StringArrayDeviceID.getFACTORY().create(req.getDevice()))); if (Objects.equals(plan.getDatabase(), "")) { // Return empty result if Database not specified return new GetTimeSlotListResp(RpcUtils.SUCCESS_STATUS, new ArrayList<>()); @@ -1139,7 +1144,9 @@ public class PartitionManager { plan.setDatabase(req.getDatabase()); } else if (req.isSetDevice()) { plan.setDatabase(getClusterSchemaManager().getDatabaseNameByDevice(req.getDevice())); - plan.setSeriesSlotId(executor.getSeriesPartitionSlot(req.getDevice())); + plan.setSeriesSlotId( + executor.getSeriesPartitionSlot( + StringArrayDeviceID.getFACTORY().create(req.getDevice()))); if (Objects.equals(plan.getDatabase(), "")) { // Return empty result if Database not specified return new CountTimeSlotListResp(RpcUtils.SUCCESS_STATUS, 0); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index f958d4a76c1..fadda89b37a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -911,7 +911,7 @@ public class IoTDBConfig { * series partition */ private String seriesPartitionExecutorClass = - "org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor"; + "org.apache.iotdb.commons.partition.executor.HashExecutor"; /** The number of series partitions in a database */ private int seriesPartitionSlotNum = 10000; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java index 1069aca33b8..c7b9f0f9299 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java @@ -51,6 +51,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -367,7 +368,8 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { for (DataPartitionQueryParam queryParam : entry.getValue()) { seriesSlotTimePartitionMap .computeIfAbsent( - partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()), + partitionExecutor.getSeriesPartitionSlot( + StringArrayDeviceID.getFACTORY().create(queryParam.getDevicePath())), k -> new ComplexTimeSlotList( queryParam.isNeedLeftAll(), queryParam.isNeedRightAll())) @@ -403,7 +405,8 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { queryParam.isNeedRightAll()); } deviceToTimePartitionMap.putIfAbsent( - partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()), + partitionExecutor.getSeriesPartitionSlot( + StringArrayDeviceID.getFACTORY().create(queryParam.getDevicePath())), sharedTTimeSlotList); } partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java index 831ee181d44..776a0668e0f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java @@ -59,6 +59,7 @@ import org.apache.iotdb.rpc.TSStatusCode; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import org.apache.thrift.TException; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -536,7 +537,8 @@ public class PartitionCache { // check cache for each device for (String device : entry.getValue()) { TSeriesPartitionSlot seriesPartitionSlot = - partitionExecutor.getSeriesPartitionSlot(device); + partitionExecutor.getSeriesPartitionSlot( + StringArrayDeviceID.getFACTORY().create(device)); if (!map.containsKey(seriesPartitionSlot)) { // if one device not find, then return cache miss. logger.debug( @@ -701,7 +703,8 @@ public class PartitionCache { TSeriesPartitionSlot seriesPartitionSlot; if (null != dataPartitionQueryParam.getDevicePath()) { seriesPartitionSlot = - partitionExecutor.getSeriesPartitionSlot(dataPartitionQueryParam.getDevicePath()); + partitionExecutor.getSeriesPartitionSlot( + StringArrayDeviceID.getFACTORY().create(dataPartitionQueryParam.getDevicePath())); } else { return false; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakePartitionFetcherImpl.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakePartitionFetcherImpl.java index e33cc2d322c..b229f58a026 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakePartitionFetcherImpl.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakePartitionFetcherImpl.java @@ -35,6 +35,8 @@ import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; + import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -276,7 +278,8 @@ public class FakePartitionFetcherImpl implements IPartitionFetcher { for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) { TSeriesPartitionSlot seriesPartitionSlot = - partitionExecutor.getSeriesPartitionSlot(dataPartitionQueryParam.getDevicePath()); + partitionExecutor.getSeriesPartitionSlot( + StringArrayDeviceID.getFACTORY().create(dataPartitionQueryParam.getDevicePath())); Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotListMap = sgPartitionMap.computeIfAbsent(seriesPartitionSlot, k -> new HashMap<>()); for (TTimePartitionSlot timePartitionSlot : diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/PartitionCacheTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/PartitionCacheTest.java index ca9fcb062a7..b44ff588933 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/PartitionCacheTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/PartitionCacheTest.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.plan.analyze.cache.partition.PartitionCache; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -94,7 +95,9 @@ public class PartitionCacheTest { // init each device String deviceName = getDeviceName(storageGroupName, deviceNumber); TSeriesPartitionSlot seriesPartitionSlot = - new TSeriesPartitionSlot(partitionExecutor.getSeriesPartitionSlot(deviceName)); + new TSeriesPartitionSlot( + partitionExecutor.getSeriesPartitionSlot( + StringArrayDeviceID.getFACTORY().create(deviceName))); // init schemaRegion of device TConsensusGroupId schemaConsensusGroupId = new TConsensusGroupId( @@ -273,7 +276,8 @@ public class PartitionCacheTest { for (int deviceNumber = 0; deviceNumber < DEVICE_PER_STORAGE_GROUP; deviceNumber++) { String deviceName = getDeviceName(storageGroupName, deviceNumber); TSeriesPartitionSlot seriesPartitionSlot = - partitionExecutor.getSeriesPartitionSlot(deviceName); + partitionExecutor.getSeriesPartitionSlot( + StringArrayDeviceID.getFACTORY().create(deviceName)); Map<String, List<String>> searchMap = new HashMap<>(); searchMap.put(storageGroupName, Collections.singletonList(deviceName)); SchemaPartition schemaPartition = partitionCache.getSchemaPartition(searchMap); @@ -338,7 +342,8 @@ public class PartitionCacheTest { for (int deviceNumber = 0; deviceNumber < DEVICE_PER_STORAGE_GROUP; deviceNumber++) { String deviceName = getDeviceName(storageGroupName, deviceNumber); TSeriesPartitionSlot seriesPartitionSlot = - partitionExecutor.getSeriesPartitionSlot(deviceName); + partitionExecutor.getSeriesPartitionSlot( + StringArrayDeviceID.getFACTORY().create(deviceName)); // try to get DataPartition from partitionCache Map<String, List<DataPartitionQueryParam>> searchMap = getStorageGroupToQueryParamsMap(storageGroupName, deviceName, false); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util.java index 59cd36a2f8a..c02d30873b4 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util.java @@ -58,6 +58,7 @@ import org.apache.iotdb.db.schemaengine.template.Template; import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.utils.Pair; @@ -166,12 +167,24 @@ public class Util { Map<TTimePartitionSlot, List<TRegionReplicaSet>> d6DataRegionMap = new HashMap<>(); d6DataRegionMap.put(new TTimePartitionSlot(), d6DataRegions); - sgPartitionMap.put(executor.getSeriesPartitionSlot(device1), d1DataRegionMap); - sgPartitionMap.put(executor.getSeriesPartitionSlot(device2), d2DataRegionMap); - sgPartitionMap.put(executor.getSeriesPartitionSlot(device3), d3DataRegionMap); - sgPartitionMap.put(executor.getSeriesPartitionSlot(device4), d4DataRegionMap); - sgPartitionMap.put(executor.getSeriesPartitionSlot(device5), d5DataRegionMap); - sgPartitionMap.put(executor.getSeriesPartitionSlot(device6), d6DataRegionMap); + sgPartitionMap.put( + executor.getSeriesPartitionSlot(StringArrayDeviceID.getFACTORY().create(device1)), + d1DataRegionMap); + sgPartitionMap.put( + executor.getSeriesPartitionSlot(StringArrayDeviceID.getFACTORY().create(device2)), + d2DataRegionMap); + sgPartitionMap.put( + executor.getSeriesPartitionSlot(StringArrayDeviceID.getFACTORY().create(device3)), + d3DataRegionMap); + sgPartitionMap.put( + executor.getSeriesPartitionSlot(StringArrayDeviceID.getFACTORY().create(device4)), + d4DataRegionMap); + sgPartitionMap.put( + executor.getSeriesPartitionSlot(StringArrayDeviceID.getFACTORY().create(device5)), + d5DataRegionMap); + sgPartitionMap.put( + executor.getSeriesPartitionSlot(StringArrayDeviceID.getFACTORY().create(device6)), + d6DataRegionMap); dataPartitionMap.put("root.sg", sgPartitionMap); @@ -218,9 +231,15 @@ public class Util { genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22, "192.0.2.2"))); Map<TSeriesPartitionSlot, TRegionReplicaSet> schemaRegionMap = new HashMap<>(); - schemaRegionMap.put(executor.getSeriesPartitionSlot(device1), schemaRegion1); - schemaRegionMap.put(executor.getSeriesPartitionSlot(device2), schemaRegion2); - schemaRegionMap.put(executor.getSeriesPartitionSlot(device3), schemaRegion2); + schemaRegionMap.put( + executor.getSeriesPartitionSlot(StringArrayDeviceID.getFACTORY().create(device1)), + schemaRegion1); + schemaRegionMap.put( + executor.getSeriesPartitionSlot(StringArrayDeviceID.getFACTORY().create(device2)), + schemaRegion2); + schemaRegionMap.put( + executor.getSeriesPartitionSlot(StringArrayDeviceID.getFACTORY().create(device3)), + schemaRegion2); schemaPartitionMap.put("root.sg", schemaRegionMap); schemaPartition.setSchemaPartitionMap(schemaPartitionMap); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util2.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util2.java index eb802edd9e5..ba93316fdc4 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util2.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util2.java @@ -55,6 +55,7 @@ import org.apache.iotdb.db.schemaengine.template.Template; import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.utils.Pair; @@ -103,8 +104,12 @@ public class Util2 { SeriesPartitionExecutor.getSeriesPartitionExecutor( IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(), IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum()); - sgPartitionMap.put(executor.getSeriesPartitionSlot(device1), d1DataRegionMap); - sgPartitionMap.put(executor.getSeriesPartitionSlot(device2), d2DataRegionMap); + sgPartitionMap.put( + executor.getSeriesPartitionSlot(StringArrayDeviceID.getFACTORY().create(device1)), + d1DataRegionMap); + sgPartitionMap.put( + executor.getSeriesPartitionSlot(StringArrayDeviceID.getFACTORY().create(device2)), + d2DataRegionMap); DataPartition dataPartition = new DataPartition( IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(), @@ -127,9 +132,15 @@ public class Util2 { Collections.singletonList(genDataNodeLocation(21, "192.0.1.1"))); Map<TSeriesPartitionSlot, TRegionReplicaSet> schemaRegionMap = new HashMap<>(); - schemaRegionMap.put(executor.getSeriesPartitionSlot(device1), schemaRegion1); - schemaRegionMap.put(executor.getSeriesPartitionSlot(device2), schemaRegion2); - schemaRegionMap.put(executor.getSeriesPartitionSlot(device3), schemaRegion2); + schemaRegionMap.put( + executor.getSeriesPartitionSlot(StringArrayDeviceID.getFACTORY().create(device1)), + schemaRegion1); + schemaRegionMap.put( + executor.getSeriesPartitionSlot(StringArrayDeviceID.getFACTORY().create(device2)), + schemaRegion2); + schemaRegionMap.put( + executor.getSeriesPartitionSlot(StringArrayDeviceID.getFACTORY().create(device3)), + schemaRegion2); Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap = new HashMap<>(); schemaPartitionMap.put("root.sg", schemaRegionMap); SchemaPartition schemaPartition = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java index 45f0457d782..f12b9600ce3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java @@ -44,6 +44,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOf import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -174,7 +175,8 @@ public class WritePlanNodeSplitTest { } TSeriesPartitionSlot seriesPartitionSlot = - partitionExecutor.getSeriesPartitionSlot(dataPartitionQueryParam.getDevicePath()); + partitionExecutor.getSeriesPartitionSlot( + StringArrayDeviceID.getFACTORY().create(dataPartitionQueryParam.getDevicePath())); Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotMap = seriesPartitionSlotMap.get(seriesPartitionSlot); Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotMapResult = diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties index 9f234a7f0ca..6bc47f1cb28 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties @@ -75,14 +75,11 @@ data_replication_factor=1 # SeriesPartitionSlot executor class # These hashing algorithms are currently supported: -# 1. BKDRHashExecutor(Default) -# 2. APHashExecutor -# 3. JSHashExecutor -# 4. SDBMHashExecutor +# 1. HashExecutor(Default) # Also, if you want to implement your own SeriesPartition executor, you can inherit the SeriesPartitionExecutor class and # modify this parameter to correspond to your Java class # Datatype: String -# series_partition_executor_class=org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor +# series_partition_executor_class=org.apache.iotdb.commons.partition.executor.hash.HashExecutor # The policy of extension SchemaRegionGroup for each Database. # These policies are currently supported: diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java index 4acf680443b..722dbb6cd1a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.apache.tsfile.read.filter.basic.Filter; import java.util.ArrayList; @@ -77,7 +78,8 @@ public class DataPartition extends Partition { public List<List<TTimePartitionSlot>> getTimePartitionRange( String deviceName, Filter timeFilter) { String storageGroup = getStorageGroupByDevice(deviceName); - TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName); + TSeriesPartitionSlot seriesPartitionSlot = + calculateDeviceGroupId(StringArrayDeviceID.getFACTORY().create(deviceName)); if (!dataPartitionMap.containsKey(storageGroup) || !dataPartitionMap.get(storageGroup).containsKey(seriesPartitionSlot)) { return Collections.emptyList(); @@ -120,7 +122,8 @@ public class DataPartition extends Partition { public List<TRegionReplicaSet> getDataRegionReplicaSetWithTimeFilter( String deviceName, Filter timeFilter) { String storageGroup = getStorageGroupByDevice(deviceName); - TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName); + TSeriesPartitionSlot seriesPartitionSlot = + calculateDeviceGroupId(StringArrayDeviceID.getFACTORY().create(deviceName)); if (!dataPartitionMap.containsKey(storageGroup) || !dataPartitionMap.get(storageGroup).containsKey(seriesPartitionSlot)) { return Collections.singletonList(NOT_ASSIGNED); @@ -142,7 +145,8 @@ public class DataPartition extends Partition { if (dbMap == null) { return Collections.singletonList(NOT_ASSIGNED); } - TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName); + TSeriesPartitionSlot seriesPartitionSlot = + calculateDeviceGroupId(StringArrayDeviceID.getFACTORY().create(deviceName)); Map<TTimePartitionSlot, List<TRegionReplicaSet>> seriesSlotMap = dbMap.get(seriesPartitionSlot); if (seriesSlotMap == null) { return Collections.singletonList(NOT_ASSIGNED); @@ -163,7 +167,8 @@ public class DataPartition extends Partition { // We will insert data to the last set in the list. // TODO return the latest dataRegionReplicaSet for each time partition String storageGroup = getStorageGroupByDevice(deviceName); - TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName); + TSeriesPartitionSlot seriesPartitionSlot = + calculateDeviceGroupId(StringArrayDeviceID.getFACTORY().create(deviceName)); // IMPORTANT TODO: (xingtanzjr) need to handle the situation for write operation that there are // more than 1 Regions for one timeSlot List<TRegionReplicaSet> dataRegionReplicaSets = new ArrayList<>(); @@ -191,7 +196,8 @@ public class DataPartition extends Partition { // We will insert data to the last set in the list. // TODO return the latest dataRegionReplicaSet for each time partition String storageGroup = getStorageGroupByDevice(deviceName); - TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName); + TSeriesPartitionSlot seriesPartitionSlot = + calculateDeviceGroupId(StringArrayDeviceID.getFACTORY().create(deviceName)); Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> databasePartitionMap = dataPartitionMap.get(storageGroup); if (databasePartitionMap == null) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java index 25b71eb718d..3bb5a292a94 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java @@ -21,6 +21,8 @@ package org.apache.iotdb.commons.partition; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; +import org.apache.tsfile.file.metadata.IDeviceID; + import java.util.List; // todo replace this data structure with PartitionTable @@ -39,8 +41,8 @@ public abstract class Partition { seriesSlotExecutorName, seriesPartitionSlotNum); } - protected TSeriesPartitionSlot calculateDeviceGroupId(String deviceName) { - return executor.getSeriesPartitionSlot(deviceName); + protected TSeriesPartitionSlot calculateDeviceGroupId(IDeviceID deviceID) { + return executor.getSeriesPartitionSlot(deviceID); } public abstract List<RegionReplicaSetInfo> getDistributionInfo(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java index b8a1004d9b4..ec6cb870ef0 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java @@ -24,6 +24,8 @@ import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -66,7 +68,8 @@ public class SchemaPartition extends Partition { // We will insert data to the last set in the list. // TODO return the latest dataRegionReplicaSet for each time partition String storageGroup = getStorageGroupByDevice(deviceName); - TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName); + TSeriesPartitionSlot seriesPartitionSlot = + calculateDeviceGroupId(StringArrayDeviceID.getFACTORY().create(deviceName)); if (schemaPartitionMap.get(storageGroup) == null) { throw new RuntimeException( new IoTDBException("Path does not exist. ", TSStatusCode.PATH_NOT_EXIST.getStatusCode())); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/HashExecutor.java similarity index 60% copy from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java copy to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/HashExecutor.java index 47ec3e91d1d..2569dee1add 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/HashExecutor.java @@ -16,28 +16,35 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.commons.partition.executor.hash; + +package org.apache.iotdb.commons.partition.executor; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; -import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; -public class BKDRHashExecutor extends SeriesPartitionExecutor { +import org.apache.tsfile.file.metadata.IDeviceID; + +public class HashExecutor extends SeriesPartitionExecutor { private static final int SEED = 131; + private static final char PATH_SEPARATOR = '.'; - public BKDRHashExecutor(int deviceGroupCount) { + public HashExecutor(int deviceGroupCount) { super(deviceGroupCount); } @Override - public TSeriesPartitionSlot getSeriesPartitionSlot(String device) { - int hash = 0; - - for (int i = 0; i < device.length(); i++) { - hash = hash * SEED + (int) device.charAt(i); + public TSeriesPartitionSlot getSeriesPartitionSlot(IDeviceID deviceID) { + int hash = 0, segmentNum = deviceID.segmentNum(); + for (int segmentID = 0; segmentID < segmentNum; segmentID++) { + String segment = (String) deviceID.segment(segmentID); + for (int i = 0; i < segment.length(); i++) { + hash = hash * SEED + segment.charAt(i); + } + if (segmentID < segmentNum - 1) { + hash = hash * SEED + PATH_SEPARATOR; + } } hash &= Integer.MAX_VALUE; - return new TSeriesPartitionSlot(hash % seriesPartitionSlotNum); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java index 413275f171f..3c0df7720a7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java @@ -16,10 +16,13 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.commons.partition.executor; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.tsfile.file.metadata.IDeviceID; + import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -36,7 +39,7 @@ public abstract class SeriesPartitionExecutor { this.seriesPartitionSlotNum = seriesPartitionSlotNum; } - public abstract TSeriesPartitionSlot getSeriesPartitionSlot(String device); + public abstract TSeriesPartitionSlot getSeriesPartitionSlot(IDeviceID deviceID); public static SeriesPartitionExecutor getSeriesPartitionExecutor( String executorName, int seriesPartitionSlotNum) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/APHashExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/APHashExecutor.java deleted file mode 100644 index cad082bfe23..00000000000 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/APHashExecutor.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.commons.partition.executor.hash; - -import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; -import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; - -public class APHashExecutor extends SeriesPartitionExecutor { - - public APHashExecutor(int deviceGroupCount) { - super(deviceGroupCount); - } - - @Override - public TSeriesPartitionSlot getSeriesPartitionSlot(String device) { - int hash = 0; - - for (int i = 0; i < device.length(); i++) { - if ((i & 1) == 0) { - hash ^= ((hash << 7) ^ (int) device.charAt(i) ^ (hash >> 3)); - } else { - hash ^= (~((hash << 11) ^ (int) device.charAt(i) ^ (hash >> 5))); - } - } - hash &= Integer.MAX_VALUE; - - return new TSeriesPartitionSlot(hash % seriesPartitionSlotNum); - } -} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/JSHashExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/JSHashExecutor.java deleted file mode 100644 index a22db711c4d..00000000000 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/JSHashExecutor.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.commons.partition.executor.hash; - -import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; -import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; - -public class JSHashExecutor extends SeriesPartitionExecutor { - - private static final int BASE = 1315423911; - - public JSHashExecutor(int deviceGroupCount) { - super(deviceGroupCount); - } - - @Override - public TSeriesPartitionSlot getSeriesPartitionSlot(String device) { - int hash = BASE; - - for (int i = 0; i < device.length(); i++) { - hash ^= ((hash << 5) + (int) device.charAt(i) + (hash >> 2)); - } - hash &= Integer.MAX_VALUE; - - return new TSeriesPartitionSlot(hash % seriesPartitionSlotNum); - } -} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/SDBMHashExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/SDBMHashExecutor.java deleted file mode 100644 index 105c256e3d1..00000000000 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/SDBMHashExecutor.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.commons.partition.executor.hash; - -import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; -import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; - -public class SDBMHashExecutor extends SeriesPartitionExecutor { - - public SDBMHashExecutor(int deviceGroupCount) { - super(deviceGroupCount); - } - - @Override - public TSeriesPartitionSlot getSeriesPartitionSlot(String device) { - int hash = 0; - - for (int i = 0; i < device.length(); i++) { - hash = ((int) device.charAt(i) + (hash << 6) + (hash << 16) - hash); - } - hash &= Integer.MAX_VALUE; - - return new TSeriesPartitionSlot(hash % seriesPartitionSlotNum); - } -} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/partition/executor/HashExecutorTest.java similarity index 53% rename from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java rename to iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/partition/executor/HashExecutorTest.java index 47ec3e91d1d..e0837dfe12c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/partition/executor/HashExecutorTest.java @@ -16,28 +16,39 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.commons.partition.executor.hash; + +package org.apache.iotdb.commons.partition.executor; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; -import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; -public class BKDRHashExecutor extends SeriesPartitionExecutor { +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; +import org.junit.Assert; +import org.junit.Test; + +public class HashExecutorTest { private static final int SEED = 131; + private static final String PATH_PREFIX = "root.db.d"; + private static final int TEST_SERIES_SLOT_NUM = 1000; + private static final HashExecutor EXECUTOR = new HashExecutor(TEST_SERIES_SLOT_NUM); - public BKDRHashExecutor(int deviceGroupCount) { - super(deviceGroupCount); + @Test + public void hashExecutorCompatibleTest() { + for (int suffix = 0; suffix < 1000; suffix++) { + String device = PATH_PREFIX + suffix; + IDeviceID deviceID = StringArrayDeviceID.getFACTORY().create(PATH_PREFIX + suffix); + Assert.assertEquals( + oldVersionGetSeriesPartitionSlot(device), EXECUTOR.getSeriesPartitionSlot(deviceID)); + } } - @Override - public TSeriesPartitionSlot getSeriesPartitionSlot(String device) { + private TSeriesPartitionSlot oldVersionGetSeriesPartitionSlot(String device) { int hash = 0; - for (int i = 0; i < device.length(); i++) { hash = hash * SEED + (int) device.charAt(i); } hash &= Integer.MAX_VALUE; - - return new TSeriesPartitionSlot(hash % seriesPartitionSlotNum); + return new TSeriesPartitionSlot(hash % TEST_SERIES_SLOT_NUM); } }
