This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch enhance_parititon_related_codes in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 58645e966b7ced9c51ea6118aba25e5ce55a6449 Author: OneSizeFitQuorum <[email protected]> AuthorDate: Tue Jun 11 14:49:18 2024 +0800 enhance hash algorithm compatibility Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../manager/partition/PartitionManager.java | 15 ++++-- .../plan/analyze/ClusterPartitionFetcher.java | 7 ++- .../analyze/cache/partition/PartitionCache.java | 7 ++- .../plan/analyze/FakePartitionFetcherImpl.java | 5 +- .../plan/analyze/cache/PartitionCacheTest.java | 11 ++-- .../plan/planner/distribution/Util.java | 37 +++++++++---- .../plan/planner/distribution/Util2.java | 21 ++++++-- .../planner/node/write/WritePlanNodeSplitTest.java | 4 +- .../iotdb/commons/partition/DataPartition.java | 16 ++++-- .../apache/iotdb/commons/partition/Partition.java | 6 ++- .../iotdb/commons/partition/SchemaPartition.java | 5 +- .../executor/SeriesPartitionExecutor.java | 7 +++ .../partition/executor/hash/APHashExecutor.java | 33 ++++++++++++ .../partition/executor/hash/BKDRHashExecutor.java | 23 ++++++++ .../partition/executor/hash/JSHashExecutor.java | 23 ++++++++ .../partition/executor/hash/SDBMHashExecutor.java | 23 ++++++++ .../partition/executor/HashExecutorTest.java | 63 ++++++++++++++++++++++ 17 files changed, 271 insertions(+), 35 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index 35d82a5454e..d3b983b4342 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -90,6 +90,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1008,7 +1009,7 @@ public class PartitionManager { * @return SeriesPartitionSlot */ public TSeriesPartitionSlot getSeriesPartitionSlot(String devicePath) { - return executor.getSeriesPartitionSlot(devicePath); + return executor.getSeriesPartitionSlot(IDeviceID.Factory.DEFAULT_FACTORY.create(devicePath)); } public RegionInfoListResp getRegionInfoList(GetRegionInfoListPlan req) { @@ -1084,7 +1085,9 @@ public class PartitionManager { plan.setDatabase(req.getDatabase()); } else { plan.setDatabase(getClusterSchemaManager().getDatabaseNameByDevice(req.getDevice())); - plan.setSeriesSlotId(executor.getSeriesPartitionSlot(req.getDevice())); + plan.setSeriesSlotId( + executor.getSeriesPartitionSlot( + IDeviceID.Factory.DEFAULT_FACTORY.create(req.getDevice()))); } if (Objects.equals(plan.getDatabase(), "")) { // Return empty result if Database not specified @@ -1119,7 +1122,9 @@ public class PartitionManager { plan.setDatabase(req.getDatabase()); } else if (req.isSetDevice()) { plan.setDatabase(getClusterSchemaManager().getDatabaseNameByDevice(req.getDevice())); - plan.setSeriesSlotId(executor.getSeriesPartitionSlot(req.getDevice())); + plan.setSeriesSlotId( + executor.getSeriesPartitionSlot( + IDeviceID.Factory.DEFAULT_FACTORY.create(req.getDevice()))); if (Objects.equals(plan.getDatabase(), "")) { // Return empty result if Database not specified return new GetTimeSlotListResp(RpcUtils.SUCCESS_STATUS, new ArrayList<>()); @@ -1146,7 +1151,9 @@ public class PartitionManager { plan.setDatabase(req.getDatabase()); } else if (req.isSetDevice()) { plan.setDatabase(getClusterSchemaManager().getDatabaseNameByDevice(req.getDevice())); - plan.setSeriesSlotId(executor.getSeriesPartitionSlot(req.getDevice())); + plan.setSeriesSlotId( + executor.getSeriesPartitionSlot( + IDeviceID.Factory.DEFAULT_FACTORY.create(req.getDevice()))); if (Objects.equals(plan.getDatabase(), "")) { // Return empty result if Database not specified return new CountTimeSlotListResp(RpcUtils.SUCCESS_STATUS, 0); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java index e76973c516b..682632da79a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java @@ -51,6 +51,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; +import org.apache.tsfile.file.metadata.IDeviceID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -368,7 +369,8 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { for (DataPartitionQueryParam queryParam : entry.getValue()) { seriesSlotTimePartitionMap .computeIfAbsent( - partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()), + partitionExecutor.getSeriesPartitionSlot( + IDeviceID.Factory.DEFAULT_FACTORY.create(queryParam.getDevicePath())), k -> new ComplexTimeSlotList( queryParam.isNeedLeftAll(), queryParam.isNeedRightAll())) @@ -404,7 +406,8 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { queryParam.isNeedRightAll()); } deviceToTimePartitionMap.putIfAbsent( - partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()), + partitionExecutor.getSeriesPartitionSlot( + IDeviceID.Factory.DEFAULT_FACTORY.create(queryParam.getDevicePath())), sharedTTimeSlotList); } partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java index 166fca9309a..f65582de803 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java @@ -59,6 +59,7 @@ import org.apache.iotdb.rpc.TSStatusCode; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import org.apache.thrift.TException; +import org.apache.tsfile.file.metadata.IDeviceID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -540,7 +541,8 @@ public class PartitionCache { // check cache for each device for (String device : entry.getValue()) { TSeriesPartitionSlot seriesPartitionSlot = - partitionExecutor.getSeriesPartitionSlot(device); + partitionExecutor.getSeriesPartitionSlot( + IDeviceID.Factory.DEFAULT_FACTORY.create(device)); if (!map.containsKey(seriesPartitionSlot)) { // if one device not find, then return cache miss. logger.debug( @@ -706,7 +708,8 @@ public class PartitionCache { TSeriesPartitionSlot seriesPartitionSlot; if (null != dataPartitionQueryParam.getDevicePath()) { seriesPartitionSlot = - partitionExecutor.getSeriesPartitionSlot(dataPartitionQueryParam.getDevicePath()); + partitionExecutor.getSeriesPartitionSlot( + IDeviceID.Factory.DEFAULT_FACTORY.create(dataPartitionQueryParam.getDevicePath())); } else { return false; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakePartitionFetcherImpl.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakePartitionFetcherImpl.java index e33cc2d322c..ccbafac12b9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakePartitionFetcherImpl.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakePartitionFetcherImpl.java @@ -35,6 +35,8 @@ import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq; +import org.apache.tsfile.file.metadata.IDeviceID; + import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -276,7 +278,8 @@ public class FakePartitionFetcherImpl implements IPartitionFetcher { for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) { TSeriesPartitionSlot seriesPartitionSlot = - partitionExecutor.getSeriesPartitionSlot(dataPartitionQueryParam.getDevicePath()); + partitionExecutor.getSeriesPartitionSlot( + IDeviceID.Factory.DEFAULT_FACTORY.create(dataPartitionQueryParam.getDevicePath())); Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotListMap = sgPartitionMap.computeIfAbsent(seriesPartitionSlot, k -> new HashMap<>()); for (TTimePartitionSlot timePartitionSlot : diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/PartitionCacheTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/PartitionCacheTest.java index ca9fcb062a7..167b6dd0a47 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/PartitionCacheTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/PartitionCacheTest.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.plan.analyze.cache.partition.PartitionCache; +import org.apache.tsfile.file.metadata.IDeviceID; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -94,7 +95,9 @@ public class PartitionCacheTest { // init each device String deviceName = getDeviceName(storageGroupName, deviceNumber); TSeriesPartitionSlot seriesPartitionSlot = - new TSeriesPartitionSlot(partitionExecutor.getSeriesPartitionSlot(deviceName)); + new TSeriesPartitionSlot( + partitionExecutor.getSeriesPartitionSlot( + IDeviceID.Factory.DEFAULT_FACTORY.create(deviceName))); // init schemaRegion of device TConsensusGroupId schemaConsensusGroupId = new TConsensusGroupId( @@ -273,7 +276,8 @@ public class PartitionCacheTest { for (int deviceNumber = 0; deviceNumber < DEVICE_PER_STORAGE_GROUP; deviceNumber++) { String deviceName = getDeviceName(storageGroupName, deviceNumber); TSeriesPartitionSlot seriesPartitionSlot = - partitionExecutor.getSeriesPartitionSlot(deviceName); + partitionExecutor.getSeriesPartitionSlot( + IDeviceID.Factory.DEFAULT_FACTORY.create(deviceName)); Map<String, List<String>> searchMap = new HashMap<>(); searchMap.put(storageGroupName, Collections.singletonList(deviceName)); SchemaPartition schemaPartition = partitionCache.getSchemaPartition(searchMap); @@ -338,7 +342,8 @@ public class PartitionCacheTest { for (int deviceNumber = 0; deviceNumber < DEVICE_PER_STORAGE_GROUP; deviceNumber++) { String deviceName = getDeviceName(storageGroupName, deviceNumber); TSeriesPartitionSlot seriesPartitionSlot = - partitionExecutor.getSeriesPartitionSlot(deviceName); + partitionExecutor.getSeriesPartitionSlot( + IDeviceID.Factory.DEFAULT_FACTORY.create(deviceName)); // try to get DataPartition from partitionCache Map<String, List<DataPartitionQueryParam>> searchMap = getStorageGroupToQueryParamsMap(storageGroupName, deviceName, false); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util.java index 59cd36a2f8a..96a48dd2562 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util.java @@ -58,6 +58,7 @@ import org.apache.iotdb.db.schemaengine.template.Template; import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.utils.Pair; @@ -166,12 +167,24 @@ public class Util { Map<TTimePartitionSlot, List<TRegionReplicaSet>> d6DataRegionMap = new HashMap<>(); d6DataRegionMap.put(new TTimePartitionSlot(), d6DataRegions); - sgPartitionMap.put(executor.getSeriesPartitionSlot(device1), d1DataRegionMap); - sgPartitionMap.put(executor.getSeriesPartitionSlot(device2), d2DataRegionMap); - sgPartitionMap.put(executor.getSeriesPartitionSlot(device3), d3DataRegionMap); - sgPartitionMap.put(executor.getSeriesPartitionSlot(device4), d4DataRegionMap); - sgPartitionMap.put(executor.getSeriesPartitionSlot(device5), d5DataRegionMap); - sgPartitionMap.put(executor.getSeriesPartitionSlot(device6), d6DataRegionMap); + sgPartitionMap.put( + executor.getSeriesPartitionSlot(IDeviceID.Factory.DEFAULT_FACTORY.create(device1)), + d1DataRegionMap); + sgPartitionMap.put( + executor.getSeriesPartitionSlot(IDeviceID.Factory.DEFAULT_FACTORY.create(device2)), + d2DataRegionMap); + sgPartitionMap.put( + executor.getSeriesPartitionSlot(IDeviceID.Factory.DEFAULT_FACTORY.create(device3)), + d3DataRegionMap); + sgPartitionMap.put( + executor.getSeriesPartitionSlot(IDeviceID.Factory.DEFAULT_FACTORY.create(device4)), + d4DataRegionMap); + sgPartitionMap.put( + executor.getSeriesPartitionSlot(IDeviceID.Factory.DEFAULT_FACTORY.create(device5)), + d5DataRegionMap); + sgPartitionMap.put( + executor.getSeriesPartitionSlot(IDeviceID.Factory.DEFAULT_FACTORY.create(device6)), + d6DataRegionMap); dataPartitionMap.put("root.sg", sgPartitionMap); @@ -218,9 +231,15 @@ public class Util { genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22, "192.0.2.2"))); Map<TSeriesPartitionSlot, TRegionReplicaSet> schemaRegionMap = new HashMap<>(); - schemaRegionMap.put(executor.getSeriesPartitionSlot(device1), schemaRegion1); - schemaRegionMap.put(executor.getSeriesPartitionSlot(device2), schemaRegion2); - schemaRegionMap.put(executor.getSeriesPartitionSlot(device3), schemaRegion2); + schemaRegionMap.put( + executor.getSeriesPartitionSlot(IDeviceID.Factory.DEFAULT_FACTORY.create(device1)), + schemaRegion1); + schemaRegionMap.put( + executor.getSeriesPartitionSlot(IDeviceID.Factory.DEFAULT_FACTORY.create(device2)), + schemaRegion2); + schemaRegionMap.put( + executor.getSeriesPartitionSlot(IDeviceID.Factory.DEFAULT_FACTORY.create(device3)), + schemaRegion2); schemaPartitionMap.put("root.sg", schemaRegionMap); schemaPartition.setSchemaPartitionMap(schemaPartitionMap); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util2.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util2.java index eb802edd9e5..cfcfeb6bb0f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util2.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util2.java @@ -55,6 +55,7 @@ import org.apache.iotdb.db.schemaengine.template.Template; import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.utils.Pair; @@ -103,8 +104,12 @@ public class Util2 { SeriesPartitionExecutor.getSeriesPartitionExecutor( IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(), IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum()); - sgPartitionMap.put(executor.getSeriesPartitionSlot(device1), d1DataRegionMap); - sgPartitionMap.put(executor.getSeriesPartitionSlot(device2), d2DataRegionMap); + sgPartitionMap.put( + executor.getSeriesPartitionSlot(IDeviceID.Factory.DEFAULT_FACTORY.create(device1)), + d1DataRegionMap); + sgPartitionMap.put( + executor.getSeriesPartitionSlot(IDeviceID.Factory.DEFAULT_FACTORY.create(device2)), + d2DataRegionMap); DataPartition dataPartition = new DataPartition( IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(), @@ -127,9 +132,15 @@ public class Util2 { Collections.singletonList(genDataNodeLocation(21, "192.0.1.1"))); Map<TSeriesPartitionSlot, TRegionReplicaSet> schemaRegionMap = new HashMap<>(); - schemaRegionMap.put(executor.getSeriesPartitionSlot(device1), schemaRegion1); - schemaRegionMap.put(executor.getSeriesPartitionSlot(device2), schemaRegion2); - schemaRegionMap.put(executor.getSeriesPartitionSlot(device3), schemaRegion2); + schemaRegionMap.put( + executor.getSeriesPartitionSlot(IDeviceID.Factory.DEFAULT_FACTORY.create(device1)), + schemaRegion1); + schemaRegionMap.put( + executor.getSeriesPartitionSlot(IDeviceID.Factory.DEFAULT_FACTORY.create(device2)), + schemaRegion2); + schemaRegionMap.put( + executor.getSeriesPartitionSlot(IDeviceID.Factory.DEFAULT_FACTORY.create(device3)), + schemaRegion2); Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap = new HashMap<>(); schemaPartitionMap.put("root.sg", schemaRegionMap); SchemaPartition schemaPartition = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java index 45f0457d782..bcf090994c4 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java @@ -44,6 +44,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOf import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IDeviceID; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -174,7 +175,8 @@ public class WritePlanNodeSplitTest { } TSeriesPartitionSlot seriesPartitionSlot = - partitionExecutor.getSeriesPartitionSlot(dataPartitionQueryParam.getDevicePath()); + partitionExecutor.getSeriesPartitionSlot( + IDeviceID.Factory.DEFAULT_FACTORY.create(dataPartitionQueryParam.getDevicePath())); Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotMap = seriesPartitionSlotMap.get(seriesPartitionSlot); Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotMapResult = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java index 4acf680443b..f28307bbcbb 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.filter.basic.Filter; import java.util.ArrayList; @@ -77,7 +78,8 @@ public class DataPartition extends Partition { public List<List<TTimePartitionSlot>> getTimePartitionRange( String deviceName, Filter timeFilter) { String storageGroup = getStorageGroupByDevice(deviceName); - TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName); + TSeriesPartitionSlot seriesPartitionSlot = + calculateDeviceGroupId(IDeviceID.Factory.DEFAULT_FACTORY.create(deviceName)); if (!dataPartitionMap.containsKey(storageGroup) || !dataPartitionMap.get(storageGroup).containsKey(seriesPartitionSlot)) { return Collections.emptyList(); @@ -120,7 +122,8 @@ public class DataPartition extends Partition { public List<TRegionReplicaSet> getDataRegionReplicaSetWithTimeFilter( String deviceName, Filter timeFilter) { String storageGroup = getStorageGroupByDevice(deviceName); - TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName); + TSeriesPartitionSlot seriesPartitionSlot = + calculateDeviceGroupId(IDeviceID.Factory.DEFAULT_FACTORY.create(deviceName)); if (!dataPartitionMap.containsKey(storageGroup) || !dataPartitionMap.get(storageGroup).containsKey(seriesPartitionSlot)) { return Collections.singletonList(NOT_ASSIGNED); @@ -142,7 +145,8 @@ public class DataPartition extends Partition { if (dbMap == null) { return Collections.singletonList(NOT_ASSIGNED); } - TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName); + TSeriesPartitionSlot seriesPartitionSlot = + calculateDeviceGroupId(IDeviceID.Factory.DEFAULT_FACTORY.create(deviceName)); Map<TTimePartitionSlot, List<TRegionReplicaSet>> seriesSlotMap = dbMap.get(seriesPartitionSlot); if (seriesSlotMap == null) { return Collections.singletonList(NOT_ASSIGNED); @@ -163,7 +167,8 @@ public class DataPartition extends Partition { // We will insert data to the last set in the list. // TODO return the latest dataRegionReplicaSet for each time partition String storageGroup = getStorageGroupByDevice(deviceName); - TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName); + TSeriesPartitionSlot seriesPartitionSlot = + calculateDeviceGroupId(IDeviceID.Factory.DEFAULT_FACTORY.create(deviceName)); // IMPORTANT TODO: (xingtanzjr) need to handle the situation for write operation that there are // more than 1 Regions for one timeSlot List<TRegionReplicaSet> dataRegionReplicaSets = new ArrayList<>(); @@ -191,7 +196,8 @@ public class DataPartition extends Partition { // We will insert data to the last set in the list. // TODO return the latest dataRegionReplicaSet for each time partition String storageGroup = getStorageGroupByDevice(deviceName); - TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName); + TSeriesPartitionSlot seriesPartitionSlot = + calculateDeviceGroupId(IDeviceID.Factory.DEFAULT_FACTORY.create(deviceName)); Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> databasePartitionMap = dataPartitionMap.get(storageGroup); if (databasePartitionMap == null) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java index 25b71eb718d..3bb5a292a94 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java @@ -21,6 +21,8 @@ package org.apache.iotdb.commons.partition; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; +import org.apache.tsfile.file.metadata.IDeviceID; + import java.util.List; // todo replace this data structure with PartitionTable @@ -39,8 +41,8 @@ public abstract class Partition { seriesSlotExecutorName, seriesPartitionSlotNum); } - protected TSeriesPartitionSlot calculateDeviceGroupId(String deviceName) { - return executor.getSeriesPartitionSlot(deviceName); + protected TSeriesPartitionSlot calculateDeviceGroupId(IDeviceID deviceID) { + return executor.getSeriesPartitionSlot(deviceID); } public abstract List<RegionReplicaSetInfo> getDistributionInfo(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java index b8a1004d9b4..07bf60eb903 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java @@ -24,6 +24,8 @@ import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.tsfile.file.metadata.IDeviceID; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -66,7 +68,8 @@ public class SchemaPartition extends Partition { // We will insert data to the last set in the list. // TODO return the latest dataRegionReplicaSet for each time partition String storageGroup = getStorageGroupByDevice(deviceName); - TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName); + TSeriesPartitionSlot seriesPartitionSlot = + calculateDeviceGroupId(IDeviceID.Factory.DEFAULT_FACTORY.create(deviceName)); if (schemaPartitionMap.get(storageGroup) == null) { throw new RuntimeException( new IoTDBException("Path does not exist. ", TSStatusCode.PATH_NOT_EXIST.getStatusCode())); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java index 413275f171f..a07cd600476 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java @@ -16,9 +16,13 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.commons.partition.executor; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.commons.utils.TestOnly; + +import org.apache.tsfile.file.metadata.IDeviceID; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -36,8 +40,11 @@ public abstract class SeriesPartitionExecutor { this.seriesPartitionSlotNum = seriesPartitionSlotNum; } + @TestOnly public abstract TSeriesPartitionSlot getSeriesPartitionSlot(String device); + public abstract TSeriesPartitionSlot getSeriesPartitionSlot(IDeviceID deviceID); + public static SeriesPartitionExecutor getSeriesPartitionExecutor( String executorName, int seriesPartitionSlotNum) { if (EXECUTOR == null) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/APHashExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/APHashExecutor.java index cad082bfe23..1573502fb65 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/APHashExecutor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/APHashExecutor.java @@ -16,11 +16,16 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.commons.partition.executor.hash; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; +import org.apache.tsfile.file.metadata.IDeviceID; + +import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR; + public class APHashExecutor extends SeriesPartitionExecutor { public APHashExecutor(int deviceGroupCount) { @@ -42,4 +47,32 @@ public class APHashExecutor extends SeriesPartitionExecutor { return new TSeriesPartitionSlot(hash % seriesPartitionSlotNum); } + + @Override + public TSeriesPartitionSlot getSeriesPartitionSlot(IDeviceID deviceID) { + int hash = 0; + int segmentNum = deviceID.segmentNum(); + int index = 0; + + for (int segmentID = 0; segmentID < segmentNum; segmentID++) { + String segment = (String) deviceID.segment(segmentID); + for (int i = 0; i < segment.length(); i++) { + if ((index++ & 1) == 0) { + hash ^= ((hash << 7) ^ (int) segment.charAt(i) ^ (hash >> 3)); + } else { + hash ^= (~((hash << 11) ^ (int) segment.charAt(i) ^ (hash >> 5))); + } + } + if (segmentID < segmentNum - 1) { + if ((index++ & 1) == 0) { + hash ^= ((hash << 7) ^ (int) PATH_SEPARATOR ^ (hash >> 3)); + } else { + hash ^= (~((hash << 11) ^ (int) PATH_SEPARATOR ^ (hash >> 5))); + } + } + } + hash &= Integer.MAX_VALUE; + + return new TSeriesPartitionSlot(hash % seriesPartitionSlotNum); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java index 47ec3e91d1d..fc1105a8b90 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java @@ -16,14 +16,18 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.commons.partition.executor.hash; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; +import org.apache.tsfile.file.metadata.IDeviceID; + public class BKDRHashExecutor extends SeriesPartitionExecutor { private static final int SEED = 131; + private static final char PATH_SEPARATOR = '.'; public BKDRHashExecutor(int deviceGroupCount) { super(deviceGroupCount); @@ -40,4 +44,23 @@ public class BKDRHashExecutor extends SeriesPartitionExecutor { return new TSeriesPartitionSlot(hash % seriesPartitionSlotNum); } + + @Override + public TSeriesPartitionSlot getSeriesPartitionSlot(IDeviceID deviceID) { + int hash = 0; + int segmentNum = deviceID.segmentNum(); + + for (int segmentID = 0; segmentID < segmentNum; segmentID++) { + String segment = (String) deviceID.segment(segmentID); + for (int i = 0; i < segment.length(); i++) { + hash = hash * SEED + (int) segment.charAt(i); + } + if (segmentID < segmentNum - 1) { + hash = hash * SEED + (int) PATH_SEPARATOR; + } + } + hash &= Integer.MAX_VALUE; + + return new TSeriesPartitionSlot(hash % seriesPartitionSlotNum); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/JSHashExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/JSHashExecutor.java index a22db711c4d..c3e088e6e8f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/JSHashExecutor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/JSHashExecutor.java @@ -21,6 +21,10 @@ package org.apache.iotdb.commons.partition.executor.hash; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; +import org.apache.tsfile.file.metadata.IDeviceID; + +import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR; + public class JSHashExecutor extends SeriesPartitionExecutor { private static final int BASE = 1315423911; @@ -40,4 +44,23 @@ public class JSHashExecutor extends SeriesPartitionExecutor { return new TSeriesPartitionSlot(hash % seriesPartitionSlotNum); } + + @Override + public TSeriesPartitionSlot getSeriesPartitionSlot(IDeviceID deviceID) { + int hash = BASE; + int segmentNum = deviceID.segmentNum(); + + for (int segmentID = 0; segmentID < segmentNum; segmentID++) { + String segment = (String) deviceID.segment(segmentID); + for (int i = 0; i < segment.length(); i++) { + hash ^= ((hash << 5) + (int) segment.charAt(i) + (hash >> 2)); + } + if (segmentID < segmentNum - 1) { + hash ^= ((hash << 5) + (int) PATH_SEPARATOR + (hash >> 2)); + } + } + hash &= Integer.MAX_VALUE; + + return new TSeriesPartitionSlot(hash % seriesPartitionSlotNum); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/SDBMHashExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/SDBMHashExecutor.java index 105c256e3d1..48cff807bab 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/SDBMHashExecutor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/SDBMHashExecutor.java @@ -21,6 +21,10 @@ package org.apache.iotdb.commons.partition.executor.hash; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; +import org.apache.tsfile.file.metadata.IDeviceID; + +import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR; + public class SDBMHashExecutor extends SeriesPartitionExecutor { public SDBMHashExecutor(int deviceGroupCount) { @@ -38,4 +42,23 @@ public class SDBMHashExecutor extends SeriesPartitionExecutor { return new TSeriesPartitionSlot(hash % seriesPartitionSlotNum); } + + @Override + public TSeriesPartitionSlot getSeriesPartitionSlot(IDeviceID deviceID) { + int hash = 0; + int segmentNum = deviceID.segmentNum(); + + for (int segmentID = 0; segmentID < segmentNum; segmentID++) { + String segment = (String) deviceID.segment(segmentID); + for (int i = 0; i < segment.length(); i++) { + hash = ((int) segment.charAt(i) + (hash << 6) + (hash << 16) - hash); + } + if (segmentID < segmentNum - 1) { + hash = ((int) PATH_SEPARATOR + (hash << 6) + (hash << 16) - hash); + } + } + hash &= Integer.MAX_VALUE; + + return new TSeriesPartitionSlot(hash % seriesPartitionSlotNum); + } } diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/partition/executor/HashExecutorTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/partition/executor/HashExecutorTest.java new file mode 100644 index 00000000000..f78f546b50b --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/partition/executor/HashExecutorTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.partition.executor; + +import org.apache.iotdb.commons.partition.executor.hash.APHashExecutor; +import org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor; +import org.apache.iotdb.commons.partition.executor.hash.JSHashExecutor; +import org.apache.iotdb.commons.partition.executor.hash.SDBMHashExecutor; + +import org.apache.tsfile.file.metadata.IDeviceID; +import org.junit.Assert; +import org.junit.Test; + +public class HashExecutorTest { + private static final String PATH_PREFIX = "root.db.g1.d"; + private static final int TEST_SERIES_SLOT_NUM = 1000; + private static final BKDRHashExecutor BKDR_HASH_EXECUTOR = + new BKDRHashExecutor(TEST_SERIES_SLOT_NUM); + + private static final APHashExecutor AP_HASH_EXECUTOR = new APHashExecutor(TEST_SERIES_SLOT_NUM); + + private static final JSHashExecutor JS_HASH_EXECUTOR = new JSHashExecutor(TEST_SERIES_SLOT_NUM); + + private static final SDBMHashExecutor SDBM_HASH_EXECUTOR = + new SDBMHashExecutor(TEST_SERIES_SLOT_NUM); + + @Test + public void hashExecutorCompatibleTest() { + for (int suffix = 0; suffix < 1000; suffix++) { + String device = PATH_PREFIX + suffix; + IDeviceID deviceID = IDeviceID.Factory.DEFAULT_FACTORY.create(PATH_PREFIX + suffix); + Assert.assertEquals( + BKDR_HASH_EXECUTOR.getSeriesPartitionSlot(device), + BKDR_HASH_EXECUTOR.getSeriesPartitionSlot(deviceID)); + Assert.assertEquals( + AP_HASH_EXECUTOR.getSeriesPartitionSlot(device), + AP_HASH_EXECUTOR.getSeriesPartitionSlot(deviceID)); + Assert.assertEquals( + JS_HASH_EXECUTOR.getSeriesPartitionSlot(device), + JS_HASH_EXECUTOR.getSeriesPartitionSlot(deviceID)); + Assert.assertEquals( + SDBM_HASH_EXECUTOR.getSeriesPartitionSlot(device), + SDBM_HASH_EXECUTOR.getSeriesPartitionSlot(deviceID)); + } + } +}
