This is an automated email from the ASF dual-hosted git repository. vgalaxies pushed a commit to branch trans-pd in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit 649b7deb9ba126034a92e39e1e8f95d4afa823ab Author: VGalaxies <[email protected]> AuthorDate: Sun May 12 12:25:44 2024 +0800 translate pd core --- .../org/apache/hugegraph/pd/ConfigService.java | 7 +- .../java/org/apache/hugegraph/pd/IdService.java | 5 +- .../java/org/apache/hugegraph/pd/KvService.java | 1 - .../hugegraph/pd/PartitionInstructionListener.java | 2 +- .../org/apache/hugegraph/pd/PartitionService.java | 268 ++++++++++----------- .../hugegraph/pd/PartitionStatusListener.java | 2 +- .../org/apache/hugegraph/pd/StoreNodeService.java | 159 ++++++------ .../apache/hugegraph/pd/TaskScheduleService.java | 169 ++++++------- .../org/apache/hugegraph/pd/config/PDConfig.java | 25 +- .../apache/hugegraph/pd/meta/ConfigMetaStore.java | 2 +- .../org/apache/hugegraph/pd/meta/IdMetaStore.java | 45 +--- .../apache/hugegraph/pd/meta/MetadataFactory.java | 2 +- .../hugegraph/pd/meta/MetadataRocksDBStore.java | 1 - .../hugegraph/pd/meta/MetadataStoreBase.java | 13 +- .../apache/hugegraph/pd/meta/PartitionMeta.java | 34 +-- .../apache/hugegraph/pd/meta/StoreInfoMeta.java | 17 +- .../org/apache/hugegraph/pd/meta/TaskInfoMeta.java | 8 +- .../org/apache/hugegraph/pd/raft/KVOperation.java | 4 +- .../org/apache/hugegraph/pd/raft/RaftEngine.java | 22 +- .../apache/hugegraph/pd/raft/RaftRpcClient.java | 2 +- .../apache/hugegraph/pd/raft/RaftStateMachine.java | 1 - .../apache/hugegraph/pd/raft/RaftTaskHandler.java | 2 +- .../org/apache/hugegraph/pd/store/RaftKVStore.java | 4 +- 23 files changed, 378 insertions(+), 417 deletions(-) diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/ConfigService.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/ConfigService.java index cc28c1b0a..07ac73af4 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/ConfigService.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/ConfigService.java @@ -71,7 +71,7 @@ public class ConfigService implements RaftStateListener { } /** - * 从存储中读取配置项,并覆盖全局的PDConfig对象 + * Read the configuration item from the storage and overwrite the global PD Config object * * @return */ @@ -120,8 +120,9 @@ public class ConfigService implements RaftStateListener { } /** - * meta store中的数量 - * 由于可能会受分区分裂/合并的影响,原始的partition count不推荐使用 + * Meta store + * The original partition count is not recommended due to the fact that it may be affected by + * partition splitting/merging * * @return partition count of cluster * @throws PDException when io error diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/IdService.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/IdService.java index 0c854d06d..a80052dac 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/IdService.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/IdService.java @@ -49,7 +49,8 @@ public class IdService { } /** - * 获取自增循环不重复id, 达到上限后从0开始自增.自动跳过正在使用的cid + * Obtain the non-duplicate ID of the auto-increment cycle, and automatically increment from + * 0 after the upper limit is reached * * @param key * @param max @@ -65,7 +66,7 @@ public class IdService { } /** - * 删除一个自增循环id + * Delete an auto-increment loop ID * * @param key * @param cid diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/KvService.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/KvService.java index f31196f81..c693a67b4 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/KvService.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/KvService.java @@ -44,7 +44,6 @@ import lombok.extern.slf4j.Slf4j; public class KvService { public static final char KV_DELIMITER = '@'; - // TODO 主前缀之后,增加类名做区分 private static final String TTL_PREFIX = "T"; private static final String KV_PREFIX = "K"; private static final String LOCK_PREFIX = "L"; diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionInstructionListener.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionInstructionListener.java index 2b1e4a637..0ed4e855b 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionInstructionListener.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionInstructionListener.java @@ -28,7 +28,7 @@ import org.apache.hugegraph.pd.grpc.pulse.SplitPartition; import org.apache.hugegraph.pd.grpc.pulse.TransferLeader; /** - * 分区命令监听 + * Partition command listening */ public interface PartitionInstructionListener { diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionService.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionService.java index c8ec3e3e7..6ec779d27 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionService.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionService.java @@ -52,7 +52,7 @@ import org.apache.hugegraph.pd.raft.RaftStateListener; import lombok.extern.slf4j.Slf4j; /** - * 分区管理 + * Partition management */ @Slf4j public class PartitionService implements RaftStateListener { @@ -61,10 +61,10 @@ public class PartitionService implements RaftStateListener { private final StoreNodeService storeService; private final PartitionMeta partitionMeta; private final PDConfig pdConfig; - // 分区命令监听 + // Partition command listening private final List<PartitionInstructionListener> instructionListeners; - // 分区状态监听 + // Partition status listeners private final List<PartitionStatusListener> statusListeners; public PartitionService(PDConfig config, StoreNodeService storeService) { @@ -83,7 +83,8 @@ public class PartitionService implements RaftStateListener { public void onStoreStatusChanged(Metapb.Store store, Metapb.StoreState old, Metapb.StoreState status) { if (status == Metapb.StoreState.Tombstone) { - // Store被停机,通知所有该store所有分区,迁移数据 + // When the store is stopped, notify all partitions of the store and migrate + // the data storeOffline(store); } } @@ -103,7 +104,7 @@ public class PartitionService implements RaftStateListener { } /** - * 返回Key所属的partition + * return key partition * * @param graphName * @param key @@ -116,7 +117,7 @@ public class PartitionService implements RaftStateListener { } /** - * 根据hashcode返回所属的partition + * Returns the partition to which it belongs based on the hashcode * * @param graphName * @param code @@ -127,7 +128,7 @@ public class PartitionService implements RaftStateListener { if (code < 0 || code >= PartitionUtils.MAX_VALUE) { throw new PDException(Pdpb.ErrorType.NOT_FOUND_VALUE, "code error"); } - // 根据Code查找分区id,如果没有找到,创建新的分区 + // Find the partition ID based on the code, and if it doesn't find, create a new partition Metapb.Partition partition = partitionMeta.getPartitionByCode(graphName, code); if (partition == null) { @@ -152,7 +153,7 @@ public class PartitionService implements RaftStateListener { } /** - * 根据ID返回分区信息 + * Returns partition information based on ID * * @param graphName * @param partId @@ -168,7 +169,6 @@ public class PartitionService implements RaftStateListener { Metapb.PartitionShard partShard = Metapb.PartitionShard.newBuilder() .setPartition(partition) - // 此处需要返回正确的leader,暂时默认取第一个 .setLeader(storeService.getLeader( partition, 0)) .build(); @@ -185,7 +185,7 @@ public class PartitionService implements RaftStateListener { } /** - * 获取图的所有分区 + * Get all partitions of the graph */ public List<Metapb.Partition> getPartitions() { return partitionMeta.getPartitions(); @@ -199,7 +199,7 @@ public class PartitionService implements RaftStateListener { } /** - * 查找在store上的所有分区 + * Find all the partitions on the store * * @param store * @return @@ -223,7 +223,7 @@ public class PartitionService implements RaftStateListener { } /** - * 产生一个新的分区 + * Creates a new partition * * @param graphName * @return @@ -232,7 +232,7 @@ public class PartitionService implements RaftStateListener { Metapb.Graph graph = partitionMeta.getAndCreateGraph(graphName); int partitionSize = PartitionUtils.MAX_VALUE / graph.getPartitionCount(); if (PartitionUtils.MAX_VALUE % graph.getPartitionCount() != 0) { - // 有余数,分区除不尽 + // There is a remainder, and the partition is inexhaustible partitionSize++; } @@ -240,12 +240,12 @@ public class PartitionService implements RaftStateListener { long startKey = (long) partitionSize * partitionId; long endKey = (long) partitionSize * (partitionId + 1); - // 检查本地 + // Check Local Metapb.Partition partition = partitionMeta.getPartitionById(graphName, partitionId); if (partition == null) { storeService.allocShards(null, partitionId); - // 分配store + // Assign a store partition = Metapb.Partition.newBuilder() .setId(partitionId) .setVersion(0) @@ -264,11 +264,12 @@ public class PartitionService implements RaftStateListener { } /** - * 计算Key所属的分区,此处采用Hash映射的方法。 + * compute graph partition id。partition gap * store group id + offset * - * @param graphName - * @param key - * @return + * @param graph graph + * @param offset offset + * @return new partition id + * @throws PDException */ protected int getPartitionId(String graphName, byte[] key) throws PDException { int code = PartitionUtils.calcHashcode(key); @@ -277,8 +278,9 @@ public class PartitionService implements RaftStateListener { } /** - * 获取key范围所跨越的所有分区 - * 暂时使用hashcode计算,正常做法,基于key进行查询 + * Gets all partitions spanned by the key range + * For the time being, hashcode is used for calculation, and the normal practice is to query + * based on the key * * @param graphName * @param startKey @@ -296,7 +298,8 @@ public class PartitionService implements RaftStateListener { partShards.add( Metapb.PartitionShard.newBuilder() .setPartition(partition) - // 此处需要返回正确的leader,暂时默认取第一个 + // Here you need to return the correct leader, and + // temporarily default to the first one .setLeader(storeService.getLeader(partition, 0)) .build() ); @@ -314,7 +317,7 @@ public class PartitionService implements RaftStateListener { } /** - * 更新分区以及图的状态 + * Update the status of partitions and graphs * * @param graph * @param partId @@ -350,7 +353,6 @@ public class PartitionService implements RaftStateListener { partitionMeta.reload(); onPartitionRemoved(partition); - // source中有些是 offline的,删除后,需要更新图的状态 try { Metapb.PartitionState state = Metapb.PartitionState.PState_Normal; for (Metapb.Partition pt : partitionMeta.getPartitions(partition.getGraphName())) { @@ -381,7 +383,7 @@ public class PartitionService implements RaftStateListener { } /** - * 获取图的分区状态 + * Get the partition status of the graph */ public List<Metapb.PartitionStats> getPartitionStatus(String graphName) throws PDException { @@ -389,7 +391,7 @@ public class PartitionService implements RaftStateListener { } /** - * 返回图的信息 + * Returns the information of the graph */ public List<Metapb.Graph> getGraphs() throws PDException { return partitionMeta.getGraphs(); @@ -400,7 +402,7 @@ public class PartitionService implements RaftStateListener { } /** - * 删除图以及图的所有分区 + * Delete the diagram and all partitions of the diagram */ public Metapb.Graph delGraph(String graphName) throws PDException { log.info("delGraph {}", graphName); @@ -414,7 +416,7 @@ public class PartitionService implements RaftStateListener { } /** - * 修改图信息,需要通知到store + * To modify the graph information, you need to notify the store */ public synchronized Metapb.Graph updateGraph(Metapb.Graph graph) throws PDException { Metapb.Graph lastGraph = partitionMeta.getAndCreateGraph(graph.getGraphName()); @@ -439,12 +441,11 @@ public class PartitionService implements RaftStateListener { .build(); partitionMeta.updateGraph(graph); - // 分区数发生改变 + // The number of partitions has changed if (lastGraph.getPartitionCount() != graph.getPartitionCount()) { log.info("updateGraph graph: {}, partition count changed from {} to {}", graph.getGraphName(), lastGraph.getPartitionCount(), graph.getPartitionCount()); - // TODO 修改图的分区数,需要进行数据迁移。 } return graph; } @@ -468,7 +469,7 @@ public class PartitionService implements RaftStateListener { } /** - * 存储被下线,迁移分区数据 + * The storage is taken offline and the partition data is migrated * * @param store */ @@ -491,7 +492,7 @@ public class PartitionService implements RaftStateListener { } /** - * 存储被下线,迁移分区数据 + * The storage is taken offline and the partition data is migrated */ public synchronized void shardOffline(Metapb.Partition partition, long storeId) { try { @@ -537,7 +538,7 @@ public class PartitionService implements RaftStateListener { } /** - * 重新分配shard + * Reassign shards * * @param graph * @param partition @@ -571,7 +572,7 @@ public class PartitionService implements RaftStateListener { } /** - * 迁移分区副本 + * Migrate partition copies */ public synchronized void movePartitionsShard(Integer partitionId, long fromStore, long toStore) { @@ -599,7 +600,7 @@ public class PartitionService implements RaftStateListener { // storeService.updateShardGroup(partitionId, shards, -1, -1); // storeService.onShardGroupStatusChanged(shardGroup, newShardGroup); fireChangeShard(partition, shards, ConfChangeType.CONF_CHANGE_TYPE_ADJUST); - // shard group和 graph无关,迁移一个就够了 + // Shard groups have nothing to do with Graph, just one is enough break; } } catch (PDException e) { @@ -608,9 +609,9 @@ public class PartitionService implements RaftStateListener { } /** - * 把集群中所有的分区,拆成split + * Split all partitions in the cluster into splits * - * @param splits 拆分分区 + * @param splits Split partitions */ public synchronized void splitPartition(List<KVPair<Integer, Integer>> splits) throws PDException { @@ -631,13 +632,12 @@ public class PartitionService implements RaftStateListener { } /** - * 分区分裂, 把一个图拆分到N 个 + * Partition splitting, splitting a graph into N pieces * * @param graph graph * @param toCount target count * @throws PDException */ - public synchronized void splitPartition(Metapb.Graph graph, int toCount) throws PDException { var partitionCount = getPartitions(graph.getGraphName()).size(); @@ -656,7 +656,7 @@ public class PartitionService implements RaftStateListener { " current partition count"); } - // 由于是整数倍数,扩充因子为 toCount / current count + // Since it is an integer multiple,The enrichment factor is toCount / current count var splitCount = toCount / partitionCount; var list = new ArrayList<KVPair<Integer, Integer>>(); for (int i = 0; i < partitionCount; i++) { @@ -677,7 +677,7 @@ public class PartitionService implements RaftStateListener { splits.sort(Comparator.comparing(KVPair::getKey)); log.info("split partition, graph: {}, splits:{}", graph, splits); - // 从最后一个partition下标开始 + // Start with the last partition subscript var i = getPartitions(graph.getGraphName()).size(); for (var pair : splits) { @@ -688,7 +688,7 @@ public class PartitionService implements RaftStateListener { long splitLen = (partition.getEndKey() - partition.getStartKey()) / splitCount; List<Metapb.Partition> newPartitions = new ArrayList<>(); - // 第一个分区也就是原分区 + // The first partition is the original partition newPartitions.add(partition.toBuilder() .setStartKey(partition.getStartKey()) .setEndKey(partition.getStartKey() + splitLen) @@ -724,8 +724,10 @@ public class PartitionService implements RaftStateListener { if (j != 0) { partitionMeta.updatePartition(newPartition); } - // 创建shard group,如果为空,则按照partition的shard group为蓝本,去创建,保证在一个机器上 - // 如果存在,则由于各个图的分区数量不一样,需要store端复制到其他机器上 + // Create a shard group, if it is empty, create it according to the shard + // group of the partition, and ensure that it is on one machine + // If it exists, the number of partitions in each graph is not the same, and + // the store side needs to be copied to other machines var shardGroup = storeService.getShardGroup(newPartition.getId()); if (shardGroup == null) { shardGroup = storeService.getShardGroup(partition.getId()).toBuilder() @@ -735,7 +737,7 @@ public class PartitionService implements RaftStateListener { updateShardGroupCache(shardGroup); } - // 做shard list的检查 + // check shard list if (shardGroup.getShardsCount() != pdConfig.getPartition().getShardCount()) { storeService.reallocShards(shardGroup); } @@ -746,11 +748,12 @@ public class PartitionService implements RaftStateListener { .build(); fireSplitPartition(partition, splitPartition); - // 修改Partition状态为下线,任务完成后恢复为上线 + // Change the partition status to Offline, and resume the partition status to + // Offline after the task is completed updatePartitionState(partition.getGraphName(), partition.getId(), Metapb.PartitionState.PState_Offline); - // 记录事务 + // Record transactions var task = MetaTask.Task.newBuilder().setPartition(partition) .setSplitPartition(splitPartition) .build(); @@ -761,8 +764,8 @@ public class PartitionService implements RaftStateListener { } /** - * 转移leader到其他shard上. - * 转移一个partition即可 + * transfer leader to other shard 。 + * Just transfer a partition */ public void transferLeader(Integer partId, Metapb.Shard shard) { try { @@ -784,16 +787,17 @@ public class PartitionService implements RaftStateListener { } /** - * 分区合并,将整个集群的分区数,合并到toCount个 + * // todo : Check the corresponding store group and check the logic + * Partition merging: Merges the number of partitions in the entire cluster into toCount * - * @param toCount 目标分区数 + * @param toCount The number of partitions to be targeted * @throws PDException when query errors */ public void combinePartition(int toCount) throws PDException { int shardsTotalCount = getShardGroupCount(); for (var graph : getGraphs()) { - // 对所有大于toCount分区的图,都进行缩容 + // All graphs larger than the toCount partition are scaled in if (graph.getPartitionCount() > toCount) { combineGraphPartition(graph, toCount, shardsTotalCount); } @@ -801,19 +805,18 @@ public class PartitionService implements RaftStateListener { } /** - * 针对单个图,进行分区合并 + * For a single graph, perform partition merging * * @param graphName the name of the graph * @param toCount the target partition count * @throws PDException when query errors */ - public void combineGraphPartition(String graphName, int toCount) throws PDException { combineGraphPartition(getGraph(graphName), toCount, getShardGroupCount()); } /** - * 单图合并的内部实现 + * Internal implementation of single-graph merging * * @param graph the name of the graph * @param toCount the target partition count @@ -845,22 +848,22 @@ public class PartitionService implements RaftStateListener { throw new PDException(3, "Graph Combine process exists"); } - // 按照 key start 排序,合并后的key range 是连续的 + // According to key start sort var partitions = getPartitions(graph.getGraphName()).stream() .sorted(Comparator.comparing( Metapb.Partition::getStartKey)) .collect(Collectors.toList()); - // 分区编号不一定是连续的 + // Partition numbers do not have to be sequential var sortPartitions = getPartitions(graph.getGraphName()) .stream() .sorted(Comparator.comparing(Metapb.Partition::getId)) .collect(Collectors.toList()); var groupSize = partitions.size() / toCount; // merge group size - // 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11 共12个分区, 合并成4个 - // 方案:0,1,2 => 0, 3,4,5 -> 1, 6,7,8 ->2, 9,10,11 -> 3 - // 保证分区的连续性. + // 0~12 to 4 partitions + // scheme:0,1,2 => 0, 3,4,5 -> 1, 6,7,8 ->2, 9,10,11 -> 3 + // Ensure the continuity of partitions for (int i = 0; i < toCount; i++) { var startKey = partitions.get(i * groupSize).getStartKey(); var endKey = partitions.get(i * groupSize + groupSize - 1).getEndKey(); @@ -874,7 +877,7 @@ public class PartitionService implements RaftStateListener { for (int j = 0; j < groupSize; j++) { var partition = partitions.get(i * groupSize + j); - // 分区id相同,就跳过 + // If the partition ID is the same, skip it if (i == partition.getId()) { continue; } @@ -888,12 +891,12 @@ public class PartitionService implements RaftStateListener { .setKeyEnd(partition.getEndKey()) .build(); taskInfoMeta.addMovePartitionTask(partition, movePartition); - // source 下线 + // source is offline updatePartitionState(partition.getGraphName(), partition.getId(), Metapb.PartitionState.PState_Offline); fireMovePartition(partition, movePartition); } - // target 下线 + // target offline updatePartitionState(targetPartition.getGraphName(), targetPartition.getId(), Metapb.PartitionState.PState_Offline); } @@ -902,7 +905,7 @@ public class PartitionService implements RaftStateListener { } /** - * 通过 storeService 获取 raft group 总数 + * get raft group count from storeService * * @return the count of raft groups */ @@ -917,30 +920,31 @@ public class PartitionService implements RaftStateListener { } /** - * 判断图分区是否能够从from合并到to个 + * Determine whether the graph partition can be retrieved from f to t * - * @param fromCount 现在的分区数 - * @param toCount 目标分区数 + * @param fromCount The number of partitions now + * @param toCount The number of partitions to be targeted * @return true when available , or otherwise */ private boolean checkTargetCount(int fromCount, int toCount, int shardCount) { - // 要介于 1 ~ N 中间,而且可以整除 + // It should be between 1 ~ N and divisible return toCount >= 1 && toCount < fromCount && fromCount % toCount == 0 && toCount < shardCount; } /** - * 处理分区心跳, 记录Leader信息 - * 检查term和version,比较是否是最新的消息 + * Process partition heartbeats and record leader information + * Check the term and version to see if it's the latest message * * @param stats */ public void partitionHeartbeat(Metapb.PartitionStats stats) throws PDException { Metapb.ShardGroup shardGroup = storeService.getShardGroup(stats.getId()); - // shard group version changes - // (shard group 由pd控制, 在分裂等操作后,可能出现短暂不一致的情况,以pd为准) - // store控制shard leader + // shard group version changes or leader changes + // (The shard group is controlled by the PD, and there may be brief inconsistencies after + // operations such as splitting, subject to PD) + // store Upload the final one raft group data if (shardGroup != null && (shardGroup.getVersion() < stats.getLeaderTerm() || shardGroup.getConfVer() < stats.getConfVer())) { @@ -954,13 +958,13 @@ public class PartitionService implements RaftStateListener { // partitionMeta.getAndCreateGraph(partition.getGraphName()); checkShardState(partition, stats); } - // 统计信息 + // statistics partitionMeta.updatePartitionStats(stats.toBuilder() .setTimestamp(System.currentTimeMillis()).build()); } /** - * 检查shard状态,离线shard影响到分区状态 + * Check the shard status, offline shard affects the partition status * * @param stats */ @@ -1000,7 +1004,7 @@ public class PartitionService implements RaftStateListener { } /** - * 发起改变shard命令 + * Initiates the Change Shard command * * @param changeType */ @@ -1028,7 +1032,7 @@ public class PartitionService implements RaftStateListener { } /** - * 发送分区分裂消息 + * Send a partition split message * * @param partition */ @@ -1045,7 +1049,7 @@ public class PartitionService implements RaftStateListener { } /** - * 发送Leader切换消息 + * Send a Leader Switchover message */ protected void fireTransferLeader(Metapb.Partition partition, TransferLeader transferLeader) { log.info("fireTransferLeader partition: {}-{}, leader :{}", @@ -1060,10 +1064,10 @@ public class PartitionService implements RaftStateListener { } /** - * 发送分区移动数据的消息 + * Send a message to the partition to move data * - * @param partition 原分区 - * @param movePartition 目标分区,包含 key range + * @param partition Original partition + * @param movePartition Target partition,contains key range */ protected void fireMovePartition(Metapb.Partition partition, MovePartition movePartition) { log.info("fireMovePartition partition: {} -> {}", @@ -1106,7 +1110,7 @@ public class PartitionService implements RaftStateListener { } /** - * 处理图迁移任务 + * Handle graph migration tasks * * @param task */ @@ -1125,7 +1129,7 @@ public class PartitionService implements RaftStateListener { task.getPartition().getId(), task.getMovePartition().getTargetPartition().getId(), task.getState()); - // 已经被处理(前面有failed) + // HAS BEEN PROCESSED(There is it in front) if (pdMetaTask != null) { var newTask = pdMetaTask.toBuilder().setState(task.getState()).build(); taskInfoMeta.updateMovePartitionTask(newTask); @@ -1153,10 +1157,10 @@ public class PartitionService implements RaftStateListener { } /** - * 当所有的迁移子任务成功: - * 1. 发送清理source分区指令 - * 2. 设置target上线, 更新key range, 更新 graph partition count - * 3. 删除move task,任务结束 + * When all migration subtasks succeed: + * 1. Send cleanup source partition directives + * 2. Set up target online,renewal key range, renewal graph partition count + * 3. delete move task,mission ended * * @param subTasks all move sub tasks * @param graphName graph name @@ -1175,20 +1179,21 @@ public class PartitionService implements RaftStateListener { for (MetaTask.Task subTask : subTasks) { var source = subTask.getPartition(); var targetPartition = subTask.getMovePartition().getTargetPartition(); - // 是否处理过 + // Whether it has been dealt with or not if (!targetPartitionIds.contains(targetPartition.getId())) { - // 更新range + // renewal range var old = getPartitionById(targetPartition.getGraphName(), targetPartition.getId()); var newPartition = Metapb.Partition.newBuilder(old) .setStartKey(targetPartition.getStartKey()) .setEndKey(targetPartition.getEndKey()) .setState(Metapb.PartitionState.PState_Normal) .build(); - // 在 key range之前更新,避免store没有分区的问题, 需要到pd查询 + // Update before the key range to avoid the problem that the store does not have + // a partition and needs to be queried to the pd updatePartition(List.of(newPartition)); targetPartitions.add(newPartition); - // 发送key range 变更消息 + // Send key range change messages PartitionKeyRange partitionKeyRange = PartitionKeyRange.newBuilder() .setPartitionId(old.getId()) .setKeyStart( @@ -1196,12 +1201,13 @@ public class PartitionService implements RaftStateListener { .setKeyEnd( targetPartition.getEndKey()) .build(); - // 通知store + // Notice store fireChangePartitionKeyRange( old.toBuilder().setState(Metapb.PartitionState.PState_Normal).build(), partitionKeyRange); - // 将 target 设置为上线. source 理论上可能被删掉,所以不处理 + // Set Target to go live. source could theoretically be deleted, so it is not + // processed updatePartitionState(newPartition.getGraphName(), newPartition.getId(), Metapb.PartitionState.PState_Normal); @@ -1213,7 +1219,9 @@ public class PartitionService implements RaftStateListener { .setKeyEnd(source.getEndKey()) .setCleanType( CleanType.CLEAN_TYPE_EXCLUDE_RANGE) - // target 的 partition只需要清理数据,不需要删除分区 + // The partition of the target only + // needs to clean up the data, and does + // not need to delete the partition .setDeletePartition(!deleteFlags.contains( source.getId())) .build(); @@ -1226,14 +1234,14 @@ public class PartitionService implements RaftStateListener { CleanType.CLEAN_TYPE_EXCLUDE_RANGE, cleanPartition.getDeletePartition()); - // 清理掉被移动分区的数据 + // Clean up the data of the partition to be moved fireCleanPartition(source, cleanPartition); } - // 更新key range, 本地更新,client更新 + // renewal key range, Local updates,client renewal // updatePartition(targetPartitions); - // 更新target 分区状态,source 可能被删掉,所以不处理 + // renewal target Partition status, source may be deleted, so do not process targetPartitions.forEach(p -> { try { updatePartitionState(p.getGraphName(), p.getId(), @@ -1245,21 +1253,21 @@ public class PartitionService implements RaftStateListener { partitionMeta.reload(); - // 更新graph partition count + // renewal graph partition count var graph = getGraph(graphName).toBuilder() .setPartitionCount(targetPartitionIds.size()) .build(); updateGraph(graph); - // 事务完成 + // The transaction is complete taskInfoMeta.removeMoveTaskPrefix(graphName); } /** - * 如果缩容任务有失败的,回滚合并操作 - * 1. 清理原来的target 分区,将迁移过来的数据再删掉 - * 2. 将source/target 分区设置为上线 - * 3. 删除task,任务结束 + * If the scale-in task fails, roll back the merge operation + * 1. Clean up the original target partition and delete the migrated data + * 2. Set the source/target partition to go live + * 3. Delete the task, and the task ends * * @param graphName graph name * @param taskInfoMeta task info meta @@ -1267,12 +1275,12 @@ public class PartitionService implements RaftStateListener { */ private void handleMoveTaskIfFailed(String graphName, TaskInfoMeta taskInfoMeta) throws PDException { - // 发送清理target分区的任务, 回滚target分区 + // Send cleanup target partition tasks,rollback target partition var targetPartitionIds = new HashSet<Integer>(); for (var metaTask : taskInfoMeta.scanMoveTask(graphName)) { var source = metaTask.getPartition(); - // 设置 source 为上线 + // Set source to upline updatePartitionState(source.getGraphName(), source.getId(), Metapb.PartitionState.PState_Normal); var movedPartition = metaTask.getMovePartition().getTargetPartition(); @@ -1294,16 +1302,16 @@ public class PartitionService implements RaftStateListener { fireCleanPartition(targetPartition, cleanPartition); targetPartitionIds.add(targetPartition.getId()); - // 设置target 上线 + // Set Target online updatePartitionState(targetPartition.getGraphName(), targetPartition.getId(), Metapb.PartitionState.PState_Normal); } - // 清理掉任务列表 + // Clean up the task list taskInfoMeta.removeMoveTaskPrefix(graphName); } /** - * 处理clean task + * dispose clean task * * @param task clean task */ @@ -1316,7 +1324,7 @@ public class PartitionService implements RaftStateListener { task.getState() ); - // 如果失败重试? + // If it fails, try again? } public synchronized void handleSplitTask(MetaTask.Task task) throws PDException { @@ -1366,7 +1374,7 @@ public class PartitionService implements RaftStateListener { var source = subTask.getPartition(); var newPartition = subTask.getSplitPartition().getNewPartitionList().get(0); - // 发送key range 变更消息 + // Send key range change messages PartitionKeyRange partitionKeyRange = PartitionKeyRange.newBuilder() .setPartitionId(source.getId()) .setKeyStart( @@ -1374,16 +1382,18 @@ public class PartitionService implements RaftStateListener { .setKeyEnd( newPartition.getEndKey()) .build(); - // 通知store + // Notice store fireChangePartitionKeyRange(source, partitionKeyRange); - // 将 target 设置为上线. source 理论上可能被删掉,所以不处理 + // Set Target to go live. source could theoretically be deleted, so it is not processed CleanPartition cleanPartition = CleanPartition.newBuilder() .setKeyStart(newPartition.getStartKey()) .setKeyEnd(newPartition.getEndKey()) .setCleanType( CleanType.CLEAN_TYPE_KEEP_RANGE) - // target 的 partition只需要清理数据,不需要删除分区 + // The partition of the target only + // needs to clean up the data, and does + // not need to delete the partition .setDeletePartition(false) .build(); @@ -1397,7 +1407,7 @@ public class PartitionService implements RaftStateListener { fireCleanPartition(source, cleanPartition); - // 更新partition state + // renewal partition state for (var sp : subTask.getSplitPartition().getNewPartitionList()) { partitions.add( sp.toBuilder().setState(Metapb.PartitionState.PState_Normal).build()); @@ -1419,13 +1429,13 @@ public class PartitionService implements RaftStateListener { storeService.getShardGroups().size()); } - // 更新graph partition count + // renewal graph partition count var newGraph = graph.toBuilder() .setPartitionCount(graph.getPartitionCount() + addedPartitions) .build(); updateGraph(newGraph); - // 事务完成 + // The transaction is complete taskInfoMeta.removeSplitTaskPrefix(graphName); } @@ -1452,18 +1462,18 @@ public class PartitionService implements RaftStateListener { updatePartitionState(partition.getGraphName(), partition.getId(), Metapb.PartitionState.PState_Normal); } - // 清理掉任务列表 + // Clean up the task list taskInfoMeta.removeSplitTaskPrefix(graphName); } /** - * 接收到Leader改变的消息 - * 更新图状态,触发分区变更 + * todo : What is the impact of partition changes?? + * Received a message that the leader has changed + * Update the status of the graph and trigger a partition change */ protected void onPartitionChanged(Metapb.Partition old, Metapb.Partition partition) { log.info("onPartitionChanged partition: {}", partition); if (old != null && old.getState() != partition.getState()) { - // 状态改变,重置图的状态 Metapb.PartitionState state = Metapb.PartitionState.PState_Normal; for (Metapb.Partition pt : partitionMeta.getPartitions(partition.getGraphName())) { if (pt.getState().getNumber() > state.getNumber()) { @@ -1491,7 +1501,7 @@ public class PartitionService implements RaftStateListener { } /** - * PD的leader发生改变,需要重新加载数据 + * The leader of the PD has changed and the data needs to be reloaded */ @Override public void onRaftLeaderChanged() { @@ -1503,31 +1513,17 @@ public class PartitionService implements RaftStateListener { } } - /** - * 分区状态发生改变,需要传播到图、集群 - * - * @param graph - * @param partId - * @param state - */ public void onPartitionStateChanged(String graph, int partId, Metapb.PartitionState state) throws PDException { updatePartitionState(graph, partId, state); } - /** - * Shard状态发生改变,需要传播到分区、图、集群 - * - * @param graph - * @param partId - * @param state - */ public void onShardStateChanged(String graph, int partId, Metapb.PartitionState state) { } /** - * 发送rocksdb compaction 消息 + * Send rocksdb compaction message * * @param partId * @param tableName diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionStatusListener.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionStatusListener.java index fea0ce35d..690b60362 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionStatusListener.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionStatusListener.java @@ -20,7 +20,7 @@ package org.apache.hugegraph.pd; import org.apache.hugegraph.pd.grpc.Metapb; /** - * 分区状态监听 + * Partition status listeners */ public interface PartitionStatusListener { diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/StoreNodeService.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/StoreNodeService.java index b75532634..50d810a51 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/StoreNodeService.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/StoreNodeService.java @@ -48,14 +48,13 @@ import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; /** - * HgStore注册、保活管理类 + * Hg Store registration and keep-alive management */ @Slf4j public class StoreNodeService { private static final Long STORE_HEART_BEAT_INTERVAL = 30000L; private static final String graphSpaceConfPrefix = "HUGEGRAPH/hg/GRAPHSPACE/CONF/"; - // Store状态监听 private final List<StoreStatusListener> statusListeners; private final List<ShardGroupStatusListener> shardGroupStatusListeners; private final StoreInfoMeta storeInfoMeta; @@ -96,7 +95,6 @@ public class StoreNodeService { @Override public void onPartitionChanged(Metapb.Partition old, Metapb.Partition partition) { if (old != null && old.getState() != partition.getState()) { - // 状态改变,重置集群状态 try { List<Metapb.Partition> partitions = partitionService.getPartitionById(partition.getId()); @@ -128,7 +126,7 @@ public class StoreNodeService { } /** - * 集群是否准备就绪 + * Whether the cluster is ready or not * * @return */ @@ -138,36 +136,37 @@ public class StoreNodeService { } /** - * Store注册,记录Store的ip地址,首次注册需要生成store_ID + * Store registration, record the IP address of the Store, and the first registration needs + * to generate a store_ID * * @param store */ public Metapb.Store register(Metapb.Store store) throws PDException { if (store.getId() == 0) { - // 初始注册,生成新id,保证Id不重复。 + // Initial registration, generate a new ID, and ensure that the ID is not duplicated. store = newStoreNode(store); } if (!storeInfoMeta.storeExists(store.getId())) { log.error("Store id {} does not belong to this PD, address = {}", store.getId(), store.getAddress()); - // storeId不存在,抛出异常 + // storeId does not exist, an exception is thrown throw new PDException(Pdpb.ErrorType.STORE_ID_NOT_EXIST_VALUE, String.format("Store id %d doest not exist.", store.getId())); } - // 如果store状态为Tombstone拒绝注册。 + // If the store status is Tombstone, the registration is denied. Metapb.Store lastStore = storeInfoMeta.getStore(store.getId()); if (lastStore.getState() == Metapb.StoreState.Tombstone) { log.error("Store id {} has been removed, Please reinitialize, address = {}", store.getId(), store.getAddress()); - // storeId不存在,抛出异常 + // storeId does not exist, an exception is thrown throw new PDException(Pdpb.ErrorType.STORE_HAS_BEEN_REMOVED_VALUE, String.format("Store id %d has been removed. %s", store.getId(), store.getAddress())); } - // offline或者up,或者在初始激活列表中,自动上线 + // offline or up, or in the initial activation list, go live automatically Metapb.StoreState storeState = lastStore.getState(); if (storeState == Metapb.StoreState.Offline || storeState == Metapb.StoreState.Up || inInitialStoreList(store)) { @@ -189,33 +188,34 @@ public class StoreNodeService { long current = System.currentTimeMillis(); boolean raftChanged = false; - // 上线状态的Raft Address 发生了变更 + // On-line status Raft Address there has been a change if (!Objects.equals(lastStore.getRaftAddress(), store.getRaftAddress()) && storeState == Metapb.StoreState.Up) { - // 时间间隔太短,而且raft有变更,则认为是无效的store + // If the time interval is too short and the raft changes, it is considered an + // invalid store if (current - lastStore.getLastHeartbeat() < STORE_HEART_BEAT_INTERVAL * 0.8) { throw new PDException(Pdpb.ErrorType.STORE_PROHIBIT_DUPLICATE_VALUE, String.format("Store id %d may be duplicate. addr: %s", store.getId(), store.getAddress())); } else if (current - lastStore.getLastHeartbeat() > STORE_HEART_BEAT_INTERVAL * 1.2) { - // 认为发生了变更 + // It is considered that a change has occurred raftChanged = true; } else { - // 等待下次注册 + // Wait for the next registration return Metapb.Store.newBuilder(store).setId(0L).build(); } } - // 存储store信息 + // Store information storeInfoMeta.updateStore(store); if (storeState == Metapb.StoreState.Up) { - // 更新store 活跃状态 + // Update the store active status storeInfoMeta.keepStoreAlive(store); onStoreStatusChanged(store, Metapb.StoreState.Offline, Metapb.StoreState.Up); checkStoreStatus(); } - // 等store信息保存后,再发送变更 + // Wait for the store information to be saved before sending the changes if (raftChanged) { onStoreRaftAddressChanged(store); } @@ -229,7 +229,7 @@ public class StoreNodeService { } /** - * 产生一个新的store对象 + * Creates a new store object * * @param store * @return @@ -249,7 +249,7 @@ public class StoreNodeService { } /** - * 根据store_id返回Store信息 + * Returns Store information based on store_id * * @param id * @return @@ -265,7 +265,7 @@ public class StoreNodeService { } /** - * 更新Store信息,检测Store状态的变化,通知到Hugestore + * Update the store information, detect the change of store status, and notify Hugestore */ public synchronized Metapb.Store updateStore(Metapb.Store store) throws PDException { log.info("updateStore storeId: {}, address: {}, state: {}", store.getId(), @@ -290,10 +290,10 @@ public class StoreNodeService { storeInfoMeta.updateStore(store); if (store.getState() != Metapb.StoreState.Unknown && store.getState() != lastStore.getState()) { - // 如果希望将store下线 + // If you want to take the store offline if (store.getState() == Metapb.StoreState.Exiting) { if (lastStore.getState() == Metapb.StoreState.Exiting) { - //如果已经是下线中的状态,则不作进一步处理 + // If it is already in the offline state, no further processing will be made return lastStore; } @@ -302,19 +302,23 @@ public class StoreNodeService { activeStores.forEach(s -> { storeMap.put(s.getId(), s); }); - //如果store已经离线,直接从活跃中删除,如果store在线,暂时不从活跃中删除,等把状态置成Tombstone的时候再删除 + // If the store is offline, delete it directly from active, and if the store is + // online, temporarily delete it from active, and then delete it when the status + // is set to Tombstone if (!storeMap.containsKey(store.getId())) { log.info("updateStore removeActiveStores store {}", store.getId()); storeInfoMeta.removeActiveStore(store); } storeTurnoff(store); - } else if (store.getState() == Metapb.StoreState.Offline) { //监控到store已经离线,从活跃中删除 + } else if (store.getState() == Metapb.StoreState.Offline) { + // Monitor that the store has gone offline and is removed from the active storeInfoMeta.removeActiveStore(store); } else if (store.getState() == Metapb.StoreState.Tombstone) { - // 状态发生改变,Store关机,修改shardGroup,进行副本迁移 + // When the status changes, the store is shut down, the shardGroup is modified, + // and the replica is migrated log.info("updateStore removeActiveStores store {}", store.getId()); storeInfoMeta.removeActiveStore(store); - // 存储下线 + // Storage goes offline storeTurnoff(store); } else if (store.getState() == Metapb.StoreState.Up) { storeInfoMeta.keepStoreAlive(store); @@ -326,13 +330,13 @@ public class StoreNodeService { } /** - * store被关机,重新分配shardGroup的shard + * The shard of the shardGroup is reassigned * * @param store * @throws PDException */ public synchronized void storeTurnoff(Metapb.Store store) throws PDException { - // 遍历ShardGroup,重新分配shard + // Traverse ShardGroup,redistribution for (Metapb.ShardGroup group : getShardGroupsByStore(store.getId())) { Metapb.ShardGroup.Builder builder = Metapb.ShardGroup.newBuilder(group); builder.clearShards(); @@ -346,7 +350,8 @@ public class StoreNodeService { } /** - * 根据图名返回stores信息,如果graphName为空,返回所有store信息 + * Returns stores information based on the graph name, and if graphName is empty, all store + * information is returned * * @throws PDException */ @@ -391,7 +396,7 @@ public class StoreNodeService { } /** - * 返回活跃的store + * Returns the active store * * @param graphName * @return @@ -420,16 +425,19 @@ public class StoreNodeService { } /** - * 给partition分配store,根据图的配置,决定分配几个peer - * 分配完所有的shards,保存ShardGroup对象(store不变动,只执行一次) + * todo : New logic + * Assign a store to the partition and decide how many peers to allocate according to the + * configuration of the graph + * After allocating all the shards, save the ShardGroup object (store does not change, only + * executes once) */ public synchronized List<Metapb.Shard> allocShards(Metapb.Graph graph, int partId) throws PDException { - // 多图共用raft分组,因此分配shard只依赖partitionId. - // 图根据数据大小可以设置分区的数量,但总数不能超过raft分组数量 + // Multiple graphs share raft grouping, so assigning shard only depends on partitionId. + // The number of partitions can be set based on the size of the data, but the total + // number cannot exceed the number of raft groups if (storeInfoMeta.getShardGroup(partId) == null) { - // 获取活跃的store key - // 根据 partionID计算store + // Get active store key List<Metapb.Store> stores = storeInfoMeta.getActiveStores(); if (stores.size() == 0) { @@ -445,17 +453,18 @@ public class StoreNodeService { int shardCount = pdConfig.getPartition().getShardCount(); shardCount = Math.min(shardCount, stores.size()); - //两个shard无法选出leader - // 不能为0 + // Two shards could not elect a leader + // It cannot be 0 if (shardCount == 2 || shardCount < 1) { shardCount = 1; } - // 一次创建完所有的ShardGroup,保证初始的groupID有序,方便人工阅读 + // All ShardGroups are created at one time to ensure that the initial groupIDs are + // orderly and easy for humans to read for (int groupId = 0; groupId < pdConfig.getConfigService().getPartitionCount(); groupId++) { - int storeIdx = groupId % stores.size(); //store分配规则,简化为取模 + int storeIdx = groupId % stores.size(); // Assignment rules, simplified to modulo List<Metapb.Shard> shards = new ArrayList<>(); for (int i = 0; i < shardCount; i++) { Metapb.Shard shard = @@ -464,7 +473,7 @@ public class StoreNodeService { Metapb.ShardRole.Follower) // .build(); shards.add(shard); - storeIdx = (storeIdx + 1) >= stores.size() ? 0 : ++storeIdx; // 顺序选择 + storeIdx = (storeIdx + 1) >= stores.size() ? 0 : ++storeIdx; // Sequential } Metapb.ShardGroup group = Metapb.ShardGroup.newBuilder() @@ -484,8 +493,8 @@ public class StoreNodeService { } /** - * 根据graph的shard_count,重新分配shard - * 发送变更change shard指令 + * Based on the shard_count of the graph, reallocate shards + * Send change shard */ public synchronized List<Metapb.Shard> reallocShards(Metapb.ShardGroup shardGroup) throws PDException { @@ -505,8 +514,8 @@ public class StoreNodeService { int shardCount = pdConfig.getPartition().getShardCount(); shardCount = Math.min(shardCount, stores.size()); if (shardCount == 2 || shardCount < 1) { - // 两个shard无法选出leader - // 不能为0 + // Two shards could not elect a leader + // It cannot be 0 shardCount = 1; } @@ -514,12 +523,12 @@ public class StoreNodeService { shards.addAll(shardGroup.getShardsList()); if (shardCount > shards.size()) { - // 需要增加shard + // Need to add shards log.info("reallocShards ShardGroup {}, add shards from {} to {}", shardGroup.getId(), shards.size(), shardCount); - int storeIdx = shardGroup.getId() % stores.size(); //store分配规则,简化为取模 + int storeIdx = shardGroup.getId() % stores.size(); for (int addCount = shardCount - shards.size(); addCount > 0; ) { - // 检查是否已经存在 + // Check if it already exists if (!isStoreInShards(shards, stores.get(storeIdx).getId())) { Metapb.Shard shard = Metapb.Shard.newBuilder() .setStoreId(stores.get(storeIdx).getId()) @@ -527,10 +536,10 @@ public class StoreNodeService { shards.add(shard); addCount--; } - storeIdx = (storeIdx + 1) >= stores.size() ? 0 : ++storeIdx; // 顺序选择 + storeIdx = (storeIdx + 1) >= stores.size() ? 0 : ++storeIdx; } } else if (shardCount < shards.size()) { - // 需要减shard + // Need to reduce shard log.info("reallocShards ShardGroup {}, remove shards from {} to {}", shardGroup.getId(), shards.size(), shardCount); @@ -566,7 +575,7 @@ public class StoreNodeService { } /** - * 根据partition的数量,分配group shard + * According to the number of partitions,distribute group shard * * @param groups list of (partition id, count) * @return total groups @@ -574,7 +583,7 @@ public class StoreNodeService { public synchronized int splitShardGroups(List<KVPair<Integer, Integer>> groups) throws PDException { int sum = groups.stream().map(pair -> pair.getValue()).reduce(0, Integer::sum); - // shard group 太大 + // shard group is too big if (sum > getActiveStores().size() * pdConfig.getPartition().getMaxShardsPerStore()) { throw new PDException(Pdpb.ErrorType.Too_Many_Partitions_Per_Store_VALUE, "can't satisfy target shard group count"); @@ -586,8 +595,9 @@ public class StoreNodeService { } /** - * 分配shard group,为分裂做准备 + * Alloc shard group, prepare for the split * + * @param * @return true * @throws PDException */ @@ -639,10 +649,10 @@ public class StoreNodeService { } /** - * 通知 store 进行shard group的重建操作 + * Notify the Store to rebuild the shard group * * @param groupId raft group id - * @param shards shard list: 如果为空,则删除对应的partition engine + * @param shards shard list: If it is empty, delete the corresponding one partition engine */ public void shardGroupOp(int groupId, List<Metapb.Shard> shards) throws PDException { @@ -665,7 +675,7 @@ public class StoreNodeService { } /** - * 删除 shard group + * Delete shard group * * @param groupId shard group id */ @@ -677,7 +687,7 @@ public class StoreNodeService { onShardGroupStatusChanged(group, null); - // 修正store的分区数. (分区合并导致) + // Fix the number of partitions for the store. (Result from partition merge) var shardGroups = getShardGroups(); if (shardGroups != null) { var count1 = pdConfig.getConfigService().getPDConfig().getPartitionCount(); @@ -699,7 +709,7 @@ public class StoreNodeService { } /** - * 接收Store的心跳 + * Receive the heartbeat of the Store * * @param storeStats * @throws PDException @@ -708,7 +718,7 @@ public class StoreNodeService { this.storeInfoMeta.updateStoreStats(storeStats); Metapb.Store lastStore = this.getStore(storeStats.getStoreId()); if (lastStore == null) { - //store不存在 + // store does not exist throw new PDException(Pdpb.ErrorType.STORE_ID_NOT_EXIST_VALUE, String.format("Store id %d does not exist.", storeStats.getStoreId())); @@ -720,14 +730,16 @@ public class StoreNodeService { storeStats.getStoreId())); } Metapb.Store nowStore; - // 如果正在做store下线操作 + // If you are going to take the store offline if (lastStore.getState() == Metapb.StoreState.Exiting) { List<Metapb.Store> activeStores = this.getActiveStores(); Map<Long, Metapb.Store> storeMap = new HashMap<>(); activeStores.forEach(store -> { storeMap.put(store.getId(), store); }); - // 下线的store的分区为0,说明已经迁移完毕,可以下线,如果非0,则迁移还在进行,需要等待 + // If the partition of the offline store is 0, it means that the migration has been + // completed and can be taken offline, if it is not 0, the migration is still in + // progress and you need to wait if (storeStats.getPartitionCount() > 0 && storeMap.containsKey(storeStats.getStoreId())) { nowStore = Metapb.Store.newBuilder(lastStore) @@ -799,9 +811,9 @@ public class StoreNodeService { } /** - * 检查集群健康状态 - * 活跃机器数是否大于最小阈值 - * 分区shard在线数已否过半 * + * Check the cluster health status + * Whether the number of active machines is greater than the minimum threshold + * The number of partition shards online has exceeded half */ public synchronized void checkStoreStatus() { Metapb.ClusterStats.Builder builder = Metapb.ClusterStats.newBuilder() @@ -821,7 +833,7 @@ public class StoreNodeService { }); if (builder.getState() == Metapb.ClusterState.Cluster_OK) { - // 检查每个分区的在线shard数量是否大于半数 + // Check whether the number of online shards for each partition is greater than half for (Metapb.ShardGroup group : this.getShardGroups()) { int count = 0; for (Metapb.Shard shard : group.getShardsList()) { @@ -881,9 +893,10 @@ public class StoreNodeService { } /** - * 检查当前store是否可下线 - * 活跃机器数小于等于最小阈值,不可下线 - * 分区shard在线数不超过半数, 不可下线 + * Check whether the current store can be discontinued + * If the number of active machines is less than or equal to the minimum threshold, they + * cannot be taken offline + * If the number of shards in the partition is not more than half, it cannot be offline */ public boolean checkStoreCanOffline(Metapb.Store currentStore) { try { @@ -900,7 +913,7 @@ public class StoreNodeService { return false; } - // 检查每个分区的在线shard数量是否大于半数 + // Check whether the number of online shards for each partition is greater than half for (Metapb.ShardGroup group : this.getShardGroups()) { int count = 0; for (Metapb.Shard shard : group.getShardsList()) { @@ -920,7 +933,7 @@ public class StoreNodeService { } /** - * 对store上的对rocksdb进行compaction + * Compaction on rocksdb on the store * * @param groupId * @param tableName @@ -929,9 +942,9 @@ public class StoreNodeService { public synchronized void shardGroupsDbCompaction(int groupId, String tableName) throws PDException { - // 通知所有的store,对rocksdb进行compaction + // Notify all stores to compaction rocksdb partitionService.fireDbCompaction(groupId, tableName); - // TODO 异常怎么处理? + // TODO How to deal with exceptions? } public Map getQuota() throws PDException { @@ -1037,7 +1050,7 @@ public class StoreNodeService { } /** - * 获得分区的Leader + * Get the leader of the partition * * @param partition * @param initIdx diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/TaskScheduleService.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/TaskScheduleService.java index 889e5a023..9e933a636 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/TaskScheduleService.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/TaskScheduleService.java @@ -44,18 +44,21 @@ import org.apache.hugegraph.pd.raft.RaftEngine; import lombok.extern.slf4j.Slf4j; /** - * 任务调度服务,定时检查Store、资源、分区的状态,及时迁移数据,错误节点 - * 1、监测Store是否离线 - * 2、监测Partition的副本是否正确 - * 3、监测Partition的工作模式是否正确 - * 4、监测Partition是否需要分裂,监测分裂是否完成 + * The task scheduling service checks the status of stores, resources, and partitions on a + * regular basis, migrates data in a timely manner, and errors are on nodes + * 1. Monitor whether the store is offline + * 2. Check whether the replica of the partition is correct + * 3. Check whether the working mode of the partition is correct + * 4. Monitor whether the partition needs to be split and whether the split is completed */ @Slf4j public class TaskScheduleService { private static final String BALANCE_SHARD_KEY = "BALANCE_SHARD_KEY"; - private final long TurnOffAndBalanceInterval = 30 * 60 * 1000; //机器下线30后才能进行动态平衡 - private final long BalanceLeaderInterval = 30 * 1000; // leader平衡时间间隔 + // The dynamic balancing can only be carried out after the machine is offline for 30 minutes + private final long TurnOffAndBalanceInterval = 30 * 60 * 1000; + // leader balances the time interval + private final long BalanceLeaderInterval = 30 * 1000; private final PDConfig pdConfig; private final long clusterStartTime; // private final StoreNodeService storeService; @@ -65,14 +68,12 @@ public class TaskScheduleService { private final StoreMonitorDataService storeMonitorDataService; private final KvService kvService; private final LogService logService; - // 先按照value排序,再按照key排序 private final Comparator<KVPair<Long, Integer>> kvPairComparatorAsc = (o1, o2) -> { if (o1.getValue() == o2.getValue()) { return o1.getKey().compareTo(o2.getKey()); } return o1.getValue().compareTo(o2.getValue()); }; - // 先按照value排序(倒序),再按照key排序(升序) private final Comparator<KVPair<Long, Integer>> kvPairComparatorDesc = (o1, o2) -> { if (o1.getValue() == o2.getValue()) { return o2.getKey().compareTo(o1.getKey()); @@ -157,7 +158,7 @@ public class TaskScheduleService { if (status == Metapb.StoreState.Up) { executor.schedule(() -> { - try { //store 上线后延时1分钟进行leader平衡 + try { balancePartitionLeader(false); } catch (PDException e) { log.error("exception {}", e); @@ -190,7 +191,7 @@ public class TaskScheduleService { } /** - * 巡查所有的store,检查是否在线,存储空间是否充足 + * Inspect all stores to see if they are online and have enough storage space */ public List<Metapb.Store> patrolStores() throws PDException { if (!isLeader()) { @@ -198,7 +199,7 @@ public class TaskScheduleService { } List<Metapb.Store> changedStores = new ArrayList<>(); - // 检查store在线状态 + // Check your store online status List<Metapb.Store> stores = storeService.getStores(""); Map<Long, Metapb.Store> activeStores = storeService.getActiveStores("") .stream().collect( @@ -208,7 +209,7 @@ public class TaskScheduleService { if ((store.getState() == Metapb.StoreState.Up || store.getState() == Metapb.StoreState.Unknown) && !activeStores.containsKey(store.getId())) { - // 不在线,修改状态为离线 + // If you are not online, the modification status is offline changeStore = Metapb.Store.newBuilder(store) .setState(Metapb.StoreState.Offline) .build(); @@ -220,8 +221,8 @@ public class TaskScheduleService { pdConfig.getStore().getMaxDownTime() * 1000) && (System.currentTimeMillis() - clusterStartTime > pdConfig.getStore().getMaxDownTime() * 1000))) { - //手工修改为下线或者离线达到时长 - // 修改状态为关机, 增加 checkStoreCanOffline 检测 + // Manually change the parameter to Offline or Offline Duration + // Modify the status to shut down and increase checkStoreCanOffline detect if (storeService.checkStoreCanOffline(store)) { changeStore = Metapb.Store.newBuilder(store) .setState(Metapb.StoreState.Tombstone).build(); @@ -239,22 +240,22 @@ public class TaskScheduleService { } /** - * 巡查所有的分区,检查副本数是否正确 + * Inspect all partitions to check whether the number of replicas is correct and the number + * of replicas in the shard group */ public List<Metapb.Partition> patrolPartitions() throws PDException { if (!isLeader()) { return null; } - // 副本数不一致,重新分配副本 + // If the number of replicas is inconsistent, reallocate replicas for (Metapb.ShardGroup group : storeService.getShardGroups()) { if (group.getShardsCount() != pdConfig.getPartition().getShardCount()) { storeService.reallocShards(group); - // 避免后面的 balance partition shard 马上执行. kvService.put(BALANCE_SHARD_KEY, "DOING", 180 * 1000); } } - //检查shard是否在线。 + // Check if the shard is online. Map<Long, Metapb.Store> tombStores = storeService.getTombStores().stream().collect( Collectors.toMap(Metapb.Store::getId, t -> t)); @@ -277,8 +278,8 @@ public class TaskScheduleService { } /** - * 在Store之间平衡分区的数量 - * 机器转为UP半小时后才能进行动态平衡 + * Balance the number of partitions between stores + * It takes half an hour for the machine to turn to UP before it can be dynamically balanced */ public synchronized Map<Integer, KVPair<Long, Long>> balancePartitionShard() throws PDException { @@ -289,7 +290,7 @@ public class TaskScheduleService { } if (System.currentTimeMillis() - lastStoreTurnoffTime < TurnOffAndBalanceInterval) { - return null;//机器下线半小时后才能进行动态平衡 + return null; } int activeStores = storeService.getActiveStores().size(); @@ -298,8 +299,6 @@ public class TaskScheduleService { return null; } - // 避免频繁调用. (当改变副本数,需要调整shard list,此时又需要平衡分区)会发送重复的指令。造成结果不可预料。 - // 严重会删除掉分区. if (Objects.equals(kvService.get(BALANCE_SHARD_KEY), "DOING")) { return null; } @@ -309,20 +308,18 @@ public class TaskScheduleService { int averageCount = totalShards / activeStores; int remainder = totalShards % activeStores; - // 统计每个store上分区, StoreId ->PartitionID, ShardRole + // Count the partitions on each store, StoreId -> PartitionID, ShardRole Map<Long, Map<Integer, Metapb.ShardRole>> partitionMap = new HashMap<>(); storeService.getActiveStores().forEach(store -> { partitionMap.put(store.getId(), new HashMap<>()); }); - // 如果是leaner 说明迁移正在进行,不要重复提交任务 AtomicReference<Boolean> isLeaner = new AtomicReference<>(false); partitionService.getPartitions().forEach(partition -> { try { storeService.getShardList(partition.getId()).forEach(shard -> { Long storeId = shard.getStoreId(); - // 判断每个shard为leaner或者状态非正常状态 if (shard.getRole() == Metapb.ShardRole.Learner || partition.getState() != Metapb.PartitionState.PState_Normal) { isLeaner.set(true); @@ -342,24 +339,25 @@ public class TaskScheduleService { return null; } - // 按照shard数量由高到低排序store + // According to shard sort the quantity from highest to lowest List<KVPair<Long, Integer>> sortedList = new ArrayList<>(); partitionMap.forEach((storeId, shards) -> { sortedList.add(new KVPair(storeId, shards.size())); }); - // 由大到小排序的list sortedList.sort(((o1, o2) -> o2.getValue().compareTo(o1.getValue()))); - // 最大堆 + // The largest heap, moved in store -> shard count PriorityQueue<KVPair<Long, Integer>> maxHeap = new PriorityQueue<>(sortedList.size(), (o1, o2) -> o2.getValue() .compareTo( o1.getValue())); - // 各个副本的 committedIndex + // of individual copies committedIndex Map<Integer, Map<Long, Long>> committedIndexMap = partitionService.getCommittedIndexStats(); - // 分区ID --> 源StoreID,目标StoreID + // Partition ID -->source StoreID, target StoreID Map<Integer, KVPair<Long, Long>> movedPartitions = new HashMap<>(); - // 移除多余的shard, 按照shards由多到少的顺序遍历store,余数remainder优先给shards多的store分配,减少迁移的概率 + // Remove redundant shards, traverse the stores in the order of shards from most to + // least, and the remainder is allocated to the store with more shards first, reducing + // the probability of migration for (int index = 0; index < sortedList.size(); index++) { long storeId = sortedList.get(index).getKey(); if (!partitionMap.containsKey(storeId)) { @@ -368,7 +366,8 @@ public class TaskScheduleService { } Map<Integer, Metapb.ShardRole> shards = partitionMap.get(storeId); int targetCount = index < remainder ? averageCount + 1 : averageCount; - // 移除多余的shard, 添加源StoreID. 非Leader,并且该分区唯一 + // Remove the redundant shards and add the source StoreID. is not a leader, and the + // partition is unique if (shards.size() > targetCount) { int movedCount = shards.size() - targetCount; log.info( @@ -420,7 +419,7 @@ public class TaskScheduleService { if (partitionMap.containsKey(destStoreId)) { destContains = partitionMap.get(destStoreId).containsKey(partitionId); } - // 如果目的store已经包含了该partition,则取一下store + // If the destination store already contains the partition, take the store if (!destContains) { moveEntry.getValue().setValue(pair.getKey()); log.info( @@ -442,9 +441,9 @@ public class TaskScheduleService { kvService.put(BALANCE_SHARD_KEY, "DOING", 180 * 1000); - // 开始迁移 + // Start the migration movedPartitions.forEach((partId, storePair) -> { - // 源和目标storeID都不为0 + // Neither the source nor destination storeID is 0 if (storePair.getKey() > 0 && storePair.getValue() > 0) { partitionService.movePartitionsShard(partId, storePair.getKey(), storePair.getValue()); @@ -457,7 +456,7 @@ public class TaskScheduleService { } /** - * 在Store之间平衡分区的Leader的数量 + * Balance the number of leaders of partitions between stores */ public synchronized Map<Integer, Long> balancePartitionLeader(boolean immediately) throws PDException { @@ -475,13 +474,12 @@ public class TaskScheduleService { List<Metapb.ShardGroup> shardGroups = storeService.getShardGroups(); - // 分裂或者缩容任务的时候,退出 + // When a task is split or scaled-in, it is exited var taskMeta = storeService.getTaskInfoMeta(); if (taskMeta.hasSplitTaskDoing() || taskMeta.hasMoveTaskDoing()) { throw new PDException(1001, "split or combine task is processing, please try later!"); } - // 数据迁移的时候,退出 if (Objects.equals(kvService.get(BALANCE_SHARD_KEY), "DOING")) { throw new PDException(1001, "balance shard is processing, please try later!"); } @@ -502,7 +500,6 @@ public class TaskScheduleService { log.info("balancePartitionLeader, shard group size: {}, by store: {}", shardGroups.size(), storeShardCount); - // 按照 target count, store id稳定排序 PriorityQueue<KVPair<Long, Integer>> targetCount = new PriorityQueue<>(kvPairComparatorDesc); @@ -520,7 +517,6 @@ public class TaskScheduleService { targetCount.add(new KVPair<>(sortedGroups.get(i).getKey(), v)); sum += v; } - // 最后一个, 除不尽的情况,保证总数正确 targetCount.add(new KVPair<>(sortedGroups.get(sortedGroups.size() - 1).getKey(), shardGroups.size() - sum)); log.info("target count: {}", targetCount); @@ -529,7 +525,8 @@ public class TaskScheduleService { var map = group.getShardsList().stream() .collect(Collectors.toMap(Metapb.Shard::getStoreId, shard -> shard)); var tmpList = new ArrayList<KVPair<Long, Integer>>(); - // store比较多的情况,可能不包含对应的store id. 则先将不符合的store保存到临时列表,直到找到一个合适的store + // If there are many stores, they may not contain the corresponding store ID. Save + // the non-compliant stores to the temporary list until you find a suitable store while (!targetCount.isEmpty()) { var pair = targetCount.poll(); var storeId = pair.getKey(); @@ -549,7 +546,7 @@ public class TaskScheduleService { pair.setValue(pair.getValue() - 1); tmpList.add(pair); } - // 找到了,则处理完成 + // If it is found, the processing is complete break; } else { tmpList.add(pair); @@ -574,14 +571,13 @@ public class TaskScheduleService { shardMap.forEach((storeId, committedIndex) -> { sortedList.add(committedIndex); }); - // 由大到小排序的list sortedList.sort(Comparator.reverseOrder()); maxGap = sortedList.get(0) - sortedList.get(sortedList.size() - 1); return maxGap; } /** - * 执行分区分裂,分为自动分裂和手工分裂 + * Perform partition splitting, which is divided into automatic splitting and manual splitting * * @return * @throws PDException @@ -602,9 +598,11 @@ public class TaskScheduleService { } /** - * 自动进行分区分裂,每个store达到最大分区数量 - * 执行条件 - * 分裂后每台机器分区数量少于partition.max-partitions-per-store + * Partition splitting is performed automatically, and each store reaches the maximum number + * of partitions + * execution conditions + * The number of partitions per machine after the split is less than partition + * .max-partitions-per-store * * @throws PDException */ @@ -623,11 +621,7 @@ public class TaskScheduleService { } } - //For TEST - // pdConfig.getPartition().setMaxShardsPerStore(pdConfig.getPartition() - // .getMaxShardsPerStore()*2); - - // 计算集群能能支持的最大split count + // The maximum split count that a compute cluster can support int splitCount = pdConfig.getPartition().getMaxShardsPerStore() * storeService.getActiveStores().size() / (storeService.getShardGroups().size() * @@ -640,12 +634,12 @@ public class TaskScheduleService { + pdConfig.getPartition().getMaxShardsPerStore()); } - // 每store未达最大分区数,进行分裂 + // If the maximum number of partitions per store is not reached, it will be split log.info("Start to split partitions..., split count = {}", splitCount); - // 设置集群状态为下线 + // Set the cluster status to Offline storeService.updateClusterStatus(Metapb.ClusterState.Cluster_Offline); - // 修改默认分区数量 + // Modify the default number of partitions // pdConfig.getConfigService().setPartitionCount(storeService.getShardGroups().size() * // splitCount); @@ -658,8 +652,9 @@ public class TaskScheduleService { } /** - * Store汇报任务状态 - * 分区状态发生改变,重新计算分区所在的ShardGroup、图和整个集群的状态 + * Store reports the status of the task + * The state of the partition changes, and the state of the ShardGroup, graph, and the entire + * cluster where the partition resides * * @param task */ @@ -684,7 +679,7 @@ public class TaskScheduleService { } /** - * 对rocksdb进行compaction + * Compaction on rocksdb * * @throws PDException */ @@ -702,39 +697,44 @@ public class TaskScheduleService { } /** - * 判断是否能把一个store的分区全部迁出,给出判断结果和迁移方案 + * Determine whether all partitions of a store can be migrated out, and give the judgment + * result and migration plan */ public Map<String, Object> canAllPartitionsMovedOut(Metapb.Store sourceStore) throws PDException { if (!isLeader()) { return null; } - // 分析一个store上面的分区是否可以完全迁出 + // Analyze whether the partition on a store can be completely checked out Map<String, Object> resultMap = new HashMap<>(); - // 定义对象用于保存源store上面的分区 StoreId ->PartitionID, ShardRole + // The definition object is used to hold the partition above the source store StoreId + // ->PartitionID, ShardRole Map<Long, Map<Integer, Metapb.ShardRole>> sourcePartitionMap = new HashMap<>(); sourcePartitionMap.put(sourceStore.getId(), new HashMap<>()); - // 定义对象用于保存其他活跃store上面的分区 StoreId ->PartitionID, ShardRole + // The definition object is used to hold the partition above the other active stores + // StoreId ->PartitionID, ShardRole Map<Long, Map<Integer, Metapb.ShardRole>> otherPartitionMap = new HashMap<>(); - Map<Long, Long> availableDiskSpace = new HashMap<>(); // 每个store剩余的磁盘空间 - Map<Integer, Long> partitionDataSize = new HashMap<>(); // 记录待迁移的分区的数据量 + // The amount of disk space remaining for each store + Map<Long, Long> availableDiskSpace = new HashMap<>(); + // Record the amount of data in the partition to be migrated + Map<Integer, Long> partitionDataSize = new HashMap<>(); storeService.getActiveStores().forEach(store -> { if (store.getId() != sourceStore.getId()) { otherPartitionMap.put(store.getId(), new HashMap<>()); - // 记录其他store的剩余的磁盘空间, 单位为Byte + // Records the remaining disk space of other stores, in bytes availableDiskSpace.put(store.getId(), store.getStats().getAvailable()); } else { resultMap.put("current_store_is_online", true); } }); - // 统计待迁移的分区的数据大小 (从storeStats中统计,单位为KB) + // Count the size of the partition to be migrated (from storeStats in KB) for (Metapb.GraphStats graphStats : sourceStore.getStats().getGraphStatsList()) { partitionDataSize.put(graphStats.getPartitionId(), partitionDataSize.getOrDefault(graphStats.getPartitionId(), 0L) + graphStats.getApproximateSize()); } - // 给sourcePartitionMap 和 otherPartitionMap赋值 + // Assign values to sourcePartitionMap and otherPartitionMap partitionService.getPartitions().forEach(partition -> { try { storeService.getShardList(partition.getId()).forEach(shard -> { @@ -752,13 +752,14 @@ public class TaskScheduleService { throw new RuntimeException(e); } }); - // 统计待移除的分区:即源store上面的所有分区 + // Count the partitions to be removed: all partitions on the source store Map<Integer, KVPair<Long, Long>> movedPartitions = new HashMap<>(); for (Map.Entry<Integer, Metapb.ShardRole> entry : sourcePartitionMap.get( sourceStore.getId()).entrySet()) { movedPartitions.put(entry.getKey(), new KVPair<>(sourceStore.getId(), 0L)); } - // 统计其他store的分区数量, 用小顶堆保存,以便始终把分区数量较少的store优先考虑 + // Count the number of partitions of other stores and save them with a small top heap, so + // that stores with fewer partitions are always prioritized PriorityQueue<KVPair<Long, Integer>> minHeap = new PriorityQueue<>(otherPartitionMap.size(), (o1, o2) -> o1.getValue() .compareTo( @@ -766,24 +767,28 @@ public class TaskScheduleService { otherPartitionMap.forEach((storeId, shards) -> { minHeap.add(new KVPair(storeId, shards.size())); }); - // 遍历待迁移的分区,优先迁移到分区比较少的store + // Traverse the partitions to be migrated, and prioritize the migration to the store with + // fewer partitions Iterator<Map.Entry<Integer, KVPair<Long, Long>>> moveIterator = movedPartitions.entrySet().iterator(); while (moveIterator.hasNext()) { Map.Entry<Integer, KVPair<Long, Long>> moveEntry = moveIterator.next(); int partitionId = moveEntry.getKey(); - List<KVPair<Long, Integer>> tmpList = new ArrayList<>(); // 记录已经弹出优先队列的元素 + // Record the elements that have popped up in the priority + List<KVPair<Long, Integer>> tmpList = new ArrayList<>(); while (minHeap.size() > 0) { - KVPair<Long, Integer> pair = minHeap.poll(); //弹出首个元素 + KVPair<Long, Integer> pair = minHeap.poll(); // The first element pops up long storeId = pair.getKey(); int partitionCount = pair.getValue(); Map<Integer, Metapb.ShardRole> shards = otherPartitionMap.get(storeId); - final int unitRate = 1024; // 平衡不同存储单位的进率 + final int unitRate = 1024; // Balance the feed rate of different storage units if ((!shards.containsKey(partitionId)) && ( availableDiskSpace.getOrDefault(storeId, 0L) / unitRate >= partitionDataSize.getOrDefault(partitionId, 0L))) { - // 如果目标store上面不包含该分区,且目标store剩余空间能容纳该分区,则进行迁移 - moveEntry.getValue().setValue(storeId); //设置移动的目标store + // If the partition is not included on the destination store and the + // remaining space of the destination store can accommodate the partition, + // the migration is performed + moveEntry.getValue().setValue(storeId); // Set the target store for the move log.info("plan to move partition {} to store {}, " + "available disk space {}, current partitionSize:{}", partitionId, @@ -791,12 +796,12 @@ public class TaskScheduleService { availableDiskSpace.getOrDefault(storeId, 0L) / unitRate, partitionDataSize.getOrDefault(partitionId, 0L) ); - // 更新该store预期的剩余空间 + // Update the expected remaining space for the store availableDiskSpace.put(storeId, availableDiskSpace.getOrDefault(storeId, 0L) - partitionDataSize.getOrDefault(partitionId, 0L) * unitRate); - // 更新统计变量中该store的分区数量 + // Update the number of partitions for that store in the stat variable partitionCount += 1; pair.setValue(partitionCount); tmpList.add(pair); @@ -807,7 +812,7 @@ public class TaskScheduleService { } minHeap.addAll(tmpList); } - //检查是否未存在未分配目标store的分区 + // Check that there are no partitions that don't have a target store assigned List<Integer> remainPartitions = new ArrayList<>(); movedPartitions.forEach((partId, storePair) -> { if (storePair.getValue() == 0L) { @@ -830,10 +835,10 @@ public class TaskScheduleService { if (!isLeader()) { return null; } - // 开始迁移 + // Start the migration log.info("begin move partitions:"); movedPartitions.forEach((partId, storePair) -> { - // 源和目标storeID都不为0 + // Neither the source nor destination storeID is 0 if (storePair.getKey() > 0 && storePair.getValue() > 0) { partitionService.movePartitionsShard(partId, storePair.getKey(), storePair.getValue()); diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/config/PDConfig.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/config/PDConfig.java index 6ff66459e..0478b33da 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/config/PDConfig.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/config/PDConfig.java @@ -33,23 +33,26 @@ import org.springframework.stereotype.Component; import lombok.Data; /** - * PD配置文件 + * PD profile */ @Data @Component public class PDConfig { + // cluster ID @Value("${pd.cluster_id:1}") - private long clusterId; // 集群ID + private long clusterId; + // The patrol task interval @Value("${pd.patrol-interval:300}") - private long patrolInterval = 300; //巡查任务时间间隔 + private long patrolInterval = 300; @Value("${pd.data-path}") private String dataPath; @Value("${pd.initial-store-count:3}") private int minStoreCount; - // 初始store列表,该列表内的store自动激活 + // The initial store list, within which the store is automatically activated + // format: store_addresss, store_address, store_address/group_id, store_address/group_id @Value("${pd.initial-store-list: ''}") private String initialStoreList; @Value("${grpc.host}") @@ -84,8 +87,8 @@ public class PDConfig { } /** - * 初始分区数量 - * Store数量 * 每Store最大副本数 /每分区副本数 + * The initial number of partitions + * Number of Stores * Maximum number of replicas per Store / Number of replicas per partition * * @return */ @@ -144,7 +147,7 @@ public class PDConfig { private int port; @Value("${pd.cluster_id:1}") - private long clusterId; // 集群ID + private long clusterId; @Value("${grpc.port}") private int grpcPort; @@ -157,7 +160,7 @@ public class PDConfig { @Configuration public class Store { - // store 心跳超时时间 + // store Heartbeat timeout @Value("${store.keepAlive-timeout:300}") private long keepAliveTimeout = 300; @Value("${store.max-down-time:1800}") @@ -249,11 +252,10 @@ public class PDConfig { private int totalCount = 0; - // 每个Store最大副本数 + // Maximum number of replicas per Store @Value("${partition.store-max-shard-count:24}") private int maxShardsPerStore = 24; - // 默认分副本数量 @Value("${partition.default-shard-count:3}") private int shardCount = 3; @@ -273,7 +275,8 @@ public class PDConfig { @Configuration public class Discovery { - // 客户端注册后,无心跳最长次数,超过后,之前的注册信息会被删除 + // After the client registers, the maximum number of heartbeats is not reached, and after + // that, the previous registration information will be deleted @Value("${discovery.heartbeat-try-count:3}") private int heartbeatOutTimes = 3; } diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/ConfigMetaStore.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/ConfigMetaStore.java index df332f46b..edcec9fc8 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/ConfigMetaStore.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/ConfigMetaStore.java @@ -34,7 +34,7 @@ public class ConfigMetaStore extends MetadataRocksDBStore { } /** - * 更新图空间存储状态信息 + * Update the storage status of the graph space * * @param */ diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/IdMetaStore.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/IdMetaStore.java index a986487b2..bbff99578 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/IdMetaStore.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/IdMetaStore.java @@ -34,9 +34,6 @@ import com.caucho.hessian.io.Hessian2Output; import lombok.extern.slf4j.Slf4j; -/** - * 自增id的实现类 - */ @Slf4j public class IdMetaStore extends MetadataRocksDBStore { @@ -66,14 +63,6 @@ public class IdMetaStore extends MetadataRocksDBStore { return buf.array(); } - /** - * 获取自增id - * - * @param key - * @param delta - * @return - * @throws PDException - */ public long getId(String key, int delta) throws PDException { Object probableLock = getLock(key); byte[] keyBs = (ID_PREFIX + key).getBytes(Charset.defaultCharset()); @@ -107,18 +96,7 @@ public class IdMetaStore extends MetadataRocksDBStore { } } - /** - * 在删除name标识的cid的24小时内重复申请同一个name的cid保持同一值 - * 如此设计为了防止缓存的不一致,造成数据错误 - * - * @param key - * @param name cid 标识 - * @param max - * @return - * @throws PDException - */ public long getCId(String key, String name, long max) throws PDException { - // 检测是否有过期的cid,删除图的频率比较低,此处对性能影响不大 byte[] delKeyPrefix = (CID_DEL_SLOT_PREFIX + key + SEPARATOR).getBytes(Charset.defaultCharset()); synchronized (this) { @@ -136,11 +114,9 @@ public class IdMetaStore extends MetadataRocksDBStore { } }); - // 从延时删除队列恢复Key byte[] cidDelayKey = getCIDDelayKey(key, name); byte[] value = getOne(cidDelayKey); if (value != null) { - // 从延迟删除队列删除 remove(cidDelayKey); return ((long[]) deserialize(value))[0]; } else { @@ -149,23 +125,12 @@ public class IdMetaStore extends MetadataRocksDBStore { } } - /** - * 添加到删除队列,延后删除 - */ public long delCIdDelay(String key, String name, long cid) throws PDException { byte[] delKey = getCIDDelayKey(key, name); put(delKey, serialize(new long[]{cid, System.currentTimeMillis()})); return cid; } - /** - * 获取自增循环不重复id, 达到上限后从0开始自增 - * - * @param key - * @param max id上限,达到该值后,重新从0开始自增 - * @return - * @throws PDException - */ public long getCId(String key, long max) throws PDException { Object probableLock = getLock(key); byte[] keyBs = (CID_PREFIX + key).getBytes(Charset.defaultCharset()); @@ -173,7 +138,7 @@ public class IdMetaStore extends MetadataRocksDBStore { byte[] bs = getOne(keyBs); long current = bs != null ? bytesToLong(bs) : 0L; long last = current == 0 ? max - 1 : current - 1; - { // 查找一个未使用的cid + { List<KV> kvs = scanRange(genCIDSlotKey(key, current), genCIDSlotKey(key, max)); for (KV kv : kvs) { if (current == bytesToLong(kv.getValue())) { @@ -218,14 +183,6 @@ public class IdMetaStore extends MetadataRocksDBStore { return bsKey; } - /** - * 删除一个循环id,释放id值 - * - * @param key - * @param cid - * @return - * @throws PDException - */ public long delCId(String key, long cid) throws PDException { return remove(genCIDSlotKey(key, cid)); } diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataFactory.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataFactory.java index cc247041c..673c313df 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataFactory.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataFactory.java @@ -24,7 +24,7 @@ import org.apache.hugegraph.pd.store.HgKVStoreImpl; import org.apache.hugegraph.pd.store.RaftKVStore; /** - * 存储工厂类,创建相关对象的存储类 + * Storage Factory class to create a storage class for related objects */ public class MetadataFactory { diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataRocksDBStore.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataRocksDBStore.java index bf77e41c0..7a12a0afa 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataRocksDBStore.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataRocksDBStore.java @@ -108,7 +108,6 @@ public class MetadataRocksDBStore extends MetadataStoreBase { @Override public List<KV> scanPrefix(byte[] prefix) throws PDException { - //TODO 使用rocksdb 前缀查询 try { return this.store.scanPrefix(prefix); } catch (Exception e) { diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataStoreBase.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataStoreBase.java index 4cd9e1d36..ae7fd2079 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataStoreBase.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataStoreBase.java @@ -30,7 +30,7 @@ import com.google.protobuf.Parser; public abstract class MetadataStoreBase { - // public long timeout = 3; // 请求超时时间,默认三秒 + // public long timeout = 3; public abstract byte[] getOne(byte[] key) throws PDException; @@ -39,9 +39,8 @@ public abstract class MetadataStoreBase { public abstract void put(byte[] key, byte[] value) throws PDException; /** - * 带有过期时间的put + * A put with an expiration time */ - public abstract void putWithTTL(byte[] key, byte[] value, long ttl) throws PDException; @@ -57,7 +56,7 @@ public abstract class MetadataStoreBase { public abstract void removeWithTTL(byte[] key) throws PDException; /** - * 前缀查询 + * Prefix queries * * @param prefix * @return @@ -66,13 +65,12 @@ public abstract class MetadataStoreBase { public abstract List<KV> scanPrefix(byte[] prefix) throws PDException; /** - * 前缀查询 + * Prefix queries * * @param prefix * @return * @throws PDException */ - public abstract <E> List<E> scanPrefix(Parser<E> parser, byte[] prefix) throws PDException; public abstract List<KV> scanRange(byte[] start, byte[] end) throws PDException; @@ -81,13 +79,12 @@ public abstract class MetadataStoreBase { PDException; /** - * 检查Key是否存在 + * Check if the key exists * * @param key * @return * @throws PDException */ - public abstract boolean containsKey(byte[] key) throws PDException; public abstract long remove(byte[] key) throws PDException; diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/PartitionMeta.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/PartitionMeta.java index 713a0046d..2952a1725 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/PartitionMeta.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/PartitionMeta.java @@ -28,7 +28,7 @@ import org.apache.hugegraph.pd.grpc.Metapb; import lombok.extern.slf4j.Slf4j; /** - * 分区信息管理 + * Partition information management */ @Slf4j public class PartitionMeta extends MetadataRocksDBStore { @@ -46,7 +46,7 @@ public class PartitionMeta extends MetadataRocksDBStore { } /** - * 初始化,加载所有的分区 + * Initialize, load all partitions */ public void init() throws PDException { loadShardGroups(); @@ -69,7 +69,8 @@ public class PartitionMeta extends MetadataRocksDBStore { } /** - * partition 和 shard group分开存储,再init的时候,需要加载进来 + * The partition and shard group are stored separately, and when they are init, they need to + * be loaded * * @throws PDException */ @@ -89,7 +90,7 @@ public class PartitionMeta extends MetadataRocksDBStore { } /** - * 根据id查找分区 (先从缓存找,再到数据库中找) + * Find partitions by ID (first from the cache, then from the database) * * @param graphName * @param partId @@ -124,7 +125,7 @@ public class PartitionMeta extends MetadataRocksDBStore { } /** - * 根据code查找分区 + * Find partitions based on code */ public Metapb.Partition getPartitionByCode(String graphName, long code) throws PDException { var pair = cache.getPartitionByCode(graphName, code); @@ -144,14 +145,12 @@ public class PartitionMeta extends MetadataRocksDBStore { partitionCount = pdConfig.getPartition().getTotalCount(); } - // 管理图,只有一个分区 if (graphName.endsWith("/s") || graphName.endsWith("/m")) { partitionCount = 1; } Metapb.Graph graph = cache.getGraph(graphName); if (graph == null) { - // 保存图信息 graph = Metapb.Graph.newBuilder() .setGraphName(graphName) .setPartitionCount(partitionCount) @@ -163,7 +162,7 @@ public class PartitionMeta extends MetadataRocksDBStore { } /** - * 保存分区信息 + * Save the partition information * * @param partition * @return @@ -179,14 +178,6 @@ public class PartitionMeta extends MetadataRocksDBStore { return partition; } - /** - * 检查数据库,是否存在对应的图,不存在,则创建。 - * 更新partition的 version, conf version 和 shard list - * - * @param partition - * @return - * @throws PDException - */ public Metapb.Partition updateShardList(Metapb.Partition partition) throws PDException { if (!cache.hasGraph(partition.getGraphName())) { getAndCreateGraph(partition.getGraphName()); @@ -205,7 +196,7 @@ public class PartitionMeta extends MetadataRocksDBStore { } /** - * 删除所有分区 + * Delete all partitions */ public long removeAllPartitions(String graphName) throws PDException { cache.removeAll(graphName); @@ -227,7 +218,7 @@ public class PartitionMeta extends MetadataRocksDBStore { } /** - * 获取分区状态 + * Get the partition status */ public Metapb.PartitionStats getPartitionStats(String graphName, int id) throws PDException { byte[] prefix = MetadataKeyHelper.getPartitionStatusKey(graphName, id); @@ -235,7 +226,7 @@ public class PartitionMeta extends MetadataRocksDBStore { } /** - * 获取分区状态 + * Get the partition status */ public List<Metapb.PartitionStats> getPartitionStats(String graphName) throws PDException { byte[] prefix = MetadataKeyHelper.getPartitionStatusPrefixKey(graphName); @@ -243,7 +234,7 @@ public class PartitionMeta extends MetadataRocksDBStore { } /** - * 更新图信息 + * Update the diagram information * * @param graph * @return @@ -251,7 +242,6 @@ public class PartitionMeta extends MetadataRocksDBStore { public Metapb.Graph updateGraph(Metapb.Graph graph) throws PDException { log.info("updateGraph {}", graph); byte[] key = MetadataKeyHelper.getGraphKey(graph.getGraphName()); - // 保存图信息 put(key, graph.toByteString().toByteArray()); cache.updateGraph(graph); return graph; @@ -281,7 +271,7 @@ public class PartitionMeta extends MetadataRocksDBStore { } /** - * 删除图,并删除图id + * Delete the diagram and delete the diagram ID */ public long removeGraph(String graphName) throws PDException { byte[] key = MetadataKeyHelper.getGraphKey(graphName); diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/StoreInfoMeta.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/StoreInfoMeta.java index 2a50b0448..3019578ed 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/StoreInfoMeta.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/StoreInfoMeta.java @@ -28,7 +28,7 @@ import org.apache.hugegraph.pd.grpc.Metapb; import lombok.extern.slf4j.Slf4j; /** - * Store信息存储 + * Store information storage */ @Slf4j public class StoreInfoMeta extends MetadataRocksDBStore { @@ -55,7 +55,7 @@ public class StoreInfoMeta extends MetadataRocksDBStore { } /** - * 更新Store信息 + * Update the Store information * * @param store * @throws PDException @@ -66,7 +66,7 @@ public class StoreInfoMeta extends MetadataRocksDBStore { } /** - * 更新Store的存活状态 + * Update the survivability status of the store * * @param store */ @@ -87,7 +87,7 @@ public class StoreInfoMeta extends MetadataRocksDBStore { } /** - * 获取所有的store + * Get all the stores * * @param graphName * @return @@ -99,9 +99,8 @@ public class StoreInfoMeta extends MetadataRocksDBStore { } /** - * 获取活跃的Store + * Get an active store * - * @param graphName * @return * @throws PDException */ @@ -120,7 +119,7 @@ public class StoreInfoMeta extends MetadataRocksDBStore { } /** - * 检查storeid是否存在 + * Check whether the storeID exists * * @param storeId * @return @@ -131,7 +130,7 @@ public class StoreInfoMeta extends MetadataRocksDBStore { } /** - * 更新存储状态信息 + * Update the storage status information * * @param storeStats */ @@ -185,7 +184,7 @@ public class StoreInfoMeta extends MetadataRocksDBStore { } /** - * @return store及状态信息 + * @return store and status information * @throws PDException */ public List<Metapb.Store> getStoreStatus(boolean isActive) throws PDException { diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/TaskInfoMeta.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/TaskInfoMeta.java index 756be71e9..5dbda2b09 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/TaskInfoMeta.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/TaskInfoMeta.java @@ -27,7 +27,7 @@ import org.apache.hugegraph.pd.grpc.pulse.MovePartition; import org.apache.hugegraph.pd.grpc.pulse.SplitPartition; /** - * 任务管理 + * Task management */ public class TaskInfoMeta extends MetadataRocksDBStore { @@ -36,7 +36,7 @@ public class TaskInfoMeta extends MetadataRocksDBStore { } /** - * 添加分区分裂任务 + * Add a partition splitting task */ public void addSplitTask(int groupID, Metapb.Partition partition, SplitPartition splitPartition) throws PDException { @@ -115,9 +115,9 @@ public class TaskInfoMeta extends MetadataRocksDBStore { } /** - * 按照prefix删除迁移任务,一次分组的 + * Delete the migration task by prefixing it and group them all at once * - * @param graphName 图名称 + * @param graphName graphName * @throws PDException io error */ public void removeMoveTaskPrefix(String graphName) throws PDException { diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/KVOperation.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/KVOperation.java index 9169a248d..b27252fa1 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/KVOperation.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/KVOperation.java @@ -55,7 +55,9 @@ public class KVOperation { private byte[] key; private byte[] value; - private Object attach; // 原始对象,用于本机处理,减少一次反序列化操作 + // Raw object, used for native processing, reducing the number of deserialization + // operations + private Object attach; private Object arg; private byte op; diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java index 9ed62b0e6..67734d145 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java @@ -92,25 +92,25 @@ public class RaftEngine { log.error("The RaftEngine parameter is incorrect." + " When RAFT is enabled, the number of peers " + "cannot be less than 3"); } - // 设置 Node 参数,包括日志存储路径和状态机实例 + // Set node parameters, including the log storage path and state machine instance NodeOptions nodeOptions = new NodeOptions(); nodeOptions.setFsm(stateMachine); nodeOptions.setEnableMetrics(true); - // 日志路径 + // Log path nodeOptions.setLogUri(raftPath + "/log"); - // raft 元数据路径 + // raft metadata path nodeOptions.setRaftMetaUri(raftPath + "/meta"); - // 快照路径 + // Snapshot path nodeOptions.setSnapshotUri(raftPath + "/snapshot"); - // 初始集群 + // Initial cluster nodeOptions.setInitialConf(initConf); - // 快照时间间隔 + // Snapshot interval nodeOptions.setSnapshotIntervalSecs(config.getSnapshotInterval()); nodeOptions.setRpcConnectTimeoutMs(config.getRpcTimeout()); nodeOptions.setRpcDefaultTimeout(config.getRpcTimeout()); nodeOptions.setRpcInstallSnapshotTimeout(config.getRpcTimeout()); - // 设置 raft 配置 + // Set the raft configuration RaftOptions raftOptions = nodeOptions.getRaftOptions(); nodeOptions.setEnableMetrics(true); @@ -118,7 +118,7 @@ public class RaftEngine { final PeerId serverId = JRaftUtils.getPeerId(config.getAddress()); rpcServer = createRaftRpcServer(config.getAddress()); - // 构建 raft 组并启动 raft + // construct raft group and start raft this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer, true); this.raftNode = raftGroupService.start(false); @@ -128,7 +128,7 @@ public class RaftEngine { } /** - * 创建 raft rpc server,用于 pd 之间通讯 + * Create a Raft RPC Server for communication between PDs */ private RpcServer createRaftRpcServer(String raftAddr) { Endpoint endpoint = JRaftUtils.getEndPoint(raftAddr); @@ -164,7 +164,7 @@ public class RaftEngine { } /** - * 添加 Raft 任务,grpc 通过该接口给 raft 发送数据 + * Add a raft task, and grpc sends data to raft through this interface */ public void addTask(Task task) { if (!isLeader()) { @@ -193,7 +193,7 @@ public class RaftEngine { } /** - * 向 leader 发消息,获取 grpc 地址; + * Send a message to the leader to get the grpc address; */ public String getLeaderGrpcAddress() throws ExecutionException, InterruptedException { if (isLeader()) { diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftRpcClient.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftRpcClient.java index 6e47ce4e5..8c7398a53 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftRpcClient.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftRpcClient.java @@ -46,7 +46,7 @@ public class RaftRpcClient { } /** - * 请求快照 + * Request a snapshot */ public CompletableFuture<RaftRpcProcessor.GetMemberResponse> getGrpcAddress(final String address) { diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftStateMachine.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftStateMachine.java index e74751866..ec773ac6f 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftStateMachine.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftStateMachine.java @@ -237,7 +237,6 @@ public class RaftStateMachine extends StateMachineAdapter { try { // TODO: remove file from meta - // SnapshotReader 沒有提供刪除文件的接口 FileUtils.deleteDirectory(new File(snapshotDir)); File file = new File(snapshotArchive); if (file.exists()) { diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftTaskHandler.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftTaskHandler.java index ec8120cc8..9bd4528a5 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftTaskHandler.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftTaskHandler.java @@ -20,7 +20,7 @@ package org.apache.hugegraph.pd.raft; import org.apache.hugegraph.pd.common.PDException; /** - * 接收raft发送的数据 + * Receives data sent by raft */ public interface RaftTaskHandler { diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/RaftKVStore.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/RaftKVStore.java index ed97d13f7..b61f07ac1 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/RaftKVStore.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/RaftKVStore.java @@ -79,7 +79,7 @@ public class RaftKVStore implements HgKVStore, RaftTaskHandler { } /** - * 查询可以不走raft,直接读取 + * Queries can be read without rafting */ @Override public byte[] get(byte[] key) throws PDException { @@ -180,7 +180,7 @@ public class RaftKVStore implements HgKVStore, RaftTaskHandler { } /** - * 需要走Raft的真实操作 + * Need to walk the real operation of Raft */ private void doPut(byte[] key, byte[] value) throws PDException {
