Copilot commented on code in PR #2685:
URL:
https://github.com/apache/incubator-hugegraph/pull/2685#discussion_r2022316789
##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/StoreNodeService.java:
##########
@@ -125,6 +126,105 @@ public void onPartitionRemoved(Metapb.Partition
partition) {
});
}
+ public synchronized List<Metapb.Shard> allocShards(int partId) throws
PDException {
+ if (storeInfoMeta.getShardGroup(partId) == null) {
+ List<Metapb.Store> stores = storeInfoMeta.getActiveStores();
+
+ if (stores.size() == 0) {
+ throw new PDException(Pdpb.ErrorType.NO_ACTIVE_STORE_VALUE,
+ "There is no any online store");
+ }
+
+ if (stores.size() < pdConfig.getMinStoreCount()) {
+ throw new PDException(Pdpb.ErrorType.LESS_ACTIVE_STORE_VALUE,
+ "The number of active stores is less
then " +
+ pdConfig.getMinStoreCount());
+ }
+
+ int shardCount = pdConfig.getPartition().getShardCount();
+ shardCount = Math.min(shardCount, stores.size());
+ if (shardCount == 2 || shardCount < 1) {
+ shardCount = 1;
+ }
+ allocShards();
+ }
+ return storeInfoMeta.getShardGroup(partId).getShardsList();
+ }
+
+ public synchronized void allocShards() throws PDException {
+ if (storeInfoMeta.getShardGroupCount() ==
+ 0) {
+ List<Metapb.Store> stores = storeInfoMeta.getActiveStores();
+
+ if (stores.size() == 0) {
+ throw new PDException(Pdpb.ErrorType.NO_ACTIVE_STORE_VALUE,
+ "There is no any online store");
+ }
+
+ if (stores.size() < pdConfig.getMinStoreCount()) {
+ throw new PDException(Pdpb.ErrorType.LESS_ACTIVE_STORE_VALUE,
+ "The number of active stores is less
then " +
+ pdConfig.getMinStoreCount());
+ }
+
+ int shardCount = pdConfig.getPartition().getShardCount();
+ shardCount = Math.min(shardCount, stores.size());
+ if (shardCount == 2 || shardCount < 1) {
+ shardCount = 1;
+ }
+
+ for (int groupId = 0; groupId <
pdConfig.getConfigService().getPartitionCount();
+ groupId++) {
+ int storeIdx = groupId % stores.size();
+ List<Metapb.Shard> shards = new ArrayList<>();
+ for (int i = 0; i < shardCount; i++) {
+ Metapb.Shard shard = Metapb.Shard.newBuilder()
+
.setStoreId(stores.get(storeIdx).getId())
+ .setRole(i == 0 ?
Metapb.ShardRole.Leader :
+
Metapb.ShardRole.Follower)
+ .build();
+ shards.add(shard);
+ storeIdx = (storeIdx + 1) >= stores.size() ? 0 :
++storeIdx;
+ }
+
+ Metapb.ShardGroup group = Metapb.ShardGroup.newBuilder()
+ .setId(groupId)
+ .setState(
+
Metapb.PartitionState.PState_Normal)
+
.addAllShards(shards)
+ .build();
+ storeInfoMeta.updateShardGroup(group);
+ partitionService.updateShardGroupCache(group);
+ onShardGroupStatusChanged(group, group);
+ log.info("alloc shard group: id {}", groupId);
+ }
+ }
+
+ }
+
+ public int bulkloadPartitions(String graphName, String tableName,
+ Map<Integer, String>
parseHdfsPathMap,Integer maxDownloadRate) throws
+
PDException {
+ partitionService.bulkloadPartitions(graphName, tableName,
parseHdfsPathMap,maxDownloadRate);
+ while (true) {
+
Review Comment:
Consider adding a timeout or exit condition to this indefinite loop to
prevent a potential hang if the bulkload task status never reaches a terminal
state.
```suggestion
int maxRetries = 60; // Maximum number of retries (10 minutes)
int retries = 0;
while (retries < maxRetries) {
```
##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionService.java:
##########
@@ -1156,6 +1254,89 @@ public synchronized void handleMoveTask(MetaTask.Task
task) throws PDException {
}
}
+ public void updateStatus(Integer id, MetaTask.TaskState state) {
+ statusMap.compute(id, (key, statusInfo) -> {
+ if (statusInfo == null) {
+ statusInfo = new StatusInfo();
+ }
+ if (state == MetaTask.TaskState.Task_Success) {
+ int count = statusInfo.successCount.incrementAndGet();
+ if (count >= 3) {
+ statusInfo.state = MetaTask.TaskState.Task_Success;
+ }
+ }
+ else if (state == MetaTask.TaskState.Task_Failure) {
+ statusInfo.state = MetaTask.TaskState.Task_Failure;
+ }
+ return statusInfo;
+ });
+ }
+
+
+ public synchronized void handleBulkloadTask(MetaTask.Task task) throws
PDException {
+ log.info("handle report bulkload task, graph:{}, pid : {}, task: {}",
+ task.getPartition().getGraphName(),
task.getPartition().getId(), task);
+
+ var taskInfoMeta = storeService.getTaskInfoMeta();
+ var partition = task.getPartition();
+ MetaTask.Task pdMetaTask =
+ taskInfoMeta.getBulkloadTask(partition.getGraphName(),
partition.getId());
+
+ MetaTask.TaskState state = task.getState();
+
+ log.info("report bulkload task, graph:{}, pid : {}, state: {}",
+ task.getPartition().getGraphName(),
+ task.getPartition().getId(),
+ task.getState());
+
+ updateStatus(partition.getId(), state);
+
+ if (statusMap.get(partition.getId()).state != null &&
+ statusMap.get(partition.getId()).state !=
MetaTask.TaskState.Task_Ready) {
+ var newTask =
+
pdMetaTask.toBuilder().setState(statusMap.get(partition.getId()).state).build();
+
Review Comment:
[nitpick] Store the result of 'statusMap.get(partition.getId())' in a local
variable to avoid repeated lookups and to improve code readability.
```suggestion
var partitionStatus = statusMap.get(partition.getId());
if (partitionStatus.state != null &&
partitionStatus.state != MetaTask.TaskState.Task_Ready) {
var newTask =
pdMetaTask.toBuilder().setState(partitionStatus.state).build();
```
##########
hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/TaskAPI.java:
##########
@@ -89,6 +105,34 @@ public String splitPartitions() {
}
}
+ @PostMapping(value = "/bulkload")
+ public Map<String,String> bulkload(@RequestBody BulkloadRestRequest
request) throws PDException {
+ if (isLeader()) {
+ return handleBulkload(request);
+ } else {
+ String leaderAddress =
RaftEngine.getInstance().getLeader().getIp();
+ String url = "http://" + leaderAddress+":8620" +
"/v1/task/bulkload";
Review Comment:
[nitpick] Consider extracting the hardcoded port value into a constant or
configuration property to simplify future updates and improve maintainability.
```suggestion
String url = "http://" + leaderAddress + ":" + LEADER_PORT +
"/v1/task/bulkload";
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]