Copilot commented on code in PR #2685:
URL: 
https://github.com/apache/incubator-hugegraph/pull/2685#discussion_r2022316789


##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/StoreNodeService.java:
##########
@@ -125,6 +126,105 @@ public void onPartitionRemoved(Metapb.Partition 
partition) {
         });
     }
 
+    public synchronized List<Metapb.Shard> allocShards(int partId) throws 
PDException {
+        if (storeInfoMeta.getShardGroup(partId) == null) {
+            List<Metapb.Store> stores = storeInfoMeta.getActiveStores();
+
+            if (stores.size() == 0) {
+                throw new PDException(Pdpb.ErrorType.NO_ACTIVE_STORE_VALUE,
+                                      "There is no any online store");
+            }
+
+            if (stores.size() < pdConfig.getMinStoreCount()) {
+                throw new PDException(Pdpb.ErrorType.LESS_ACTIVE_STORE_VALUE,
+                                      "The number of active stores is less 
then " +
+                                      pdConfig.getMinStoreCount());
+            }
+
+            int shardCount = pdConfig.getPartition().getShardCount();
+            shardCount = Math.min(shardCount, stores.size());
+            if (shardCount == 2 || shardCount < 1) {
+                shardCount = 1;
+            }
+            allocShards();
+        }
+        return storeInfoMeta.getShardGroup(partId).getShardsList();
+    }
+
+    public synchronized void allocShards() throws PDException {
+        if (storeInfoMeta.getShardGroupCount() ==
+            0) {
+            List<Metapb.Store> stores = storeInfoMeta.getActiveStores();
+
+            if (stores.size() == 0) {
+                throw new PDException(Pdpb.ErrorType.NO_ACTIVE_STORE_VALUE,
+                                      "There is no any online store");
+            }
+
+            if (stores.size() < pdConfig.getMinStoreCount()) {
+                throw new PDException(Pdpb.ErrorType.LESS_ACTIVE_STORE_VALUE,
+                                      "The number of active stores is less 
then " +
+                                      pdConfig.getMinStoreCount());
+            }
+
+            int shardCount = pdConfig.getPartition().getShardCount();
+            shardCount = Math.min(shardCount, stores.size());
+            if (shardCount == 2 || shardCount < 1) {
+                shardCount = 1;
+            }
+
+            for (int groupId = 0; groupId < 
pdConfig.getConfigService().getPartitionCount();
+                 groupId++) {
+                int storeIdx = groupId % stores.size();
+                List<Metapb.Shard> shards = new ArrayList<>();
+                for (int i = 0; i < shardCount; i++) {
+                    Metapb.Shard shard = Metapb.Shard.newBuilder()
+                                                     
.setStoreId(stores.get(storeIdx).getId())
+                                                     .setRole(i == 0 ? 
Metapb.ShardRole.Leader :
+                                                              
Metapb.ShardRole.Follower)
+                                                     .build();
+                    shards.add(shard);
+                    storeIdx = (storeIdx + 1) >= stores.size() ? 0 : 
++storeIdx;
+                }
+
+                Metapb.ShardGroup group = Metapb.ShardGroup.newBuilder()
+                                                           .setId(groupId)
+                                                           .setState(
+                                                                   
Metapb.PartitionState.PState_Normal)
+                                                           
.addAllShards(shards)
+                                                           .build();
+                storeInfoMeta.updateShardGroup(group);
+                partitionService.updateShardGroupCache(group);
+                onShardGroupStatusChanged(group, group);
+                log.info("alloc shard group: id {}", groupId);
+            }
+        }
+
+    }
+
+    public int bulkloadPartitions(String graphName, String tableName,
+                                  Map<Integer, String> 
parseHdfsPathMap,Integer maxDownloadRate) throws
+                                                                         
PDException {
+        partitionService.bulkloadPartitions(graphName, tableName, 
parseHdfsPathMap,maxDownloadRate);
+        while (true) {
+

Review Comment:
   Consider adding a timeout or exit condition to this indefinite loop to 
prevent a potential hang if the bulkload task status never reaches a terminal 
state.
   ```suggestion
           int maxRetries = 60; // Maximum number of retries (10 minutes)
           int retries = 0;
           while (retries < maxRetries) {
   ```



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionService.java:
##########
@@ -1156,6 +1254,89 @@ public synchronized void handleMoveTask(MetaTask.Task 
task) throws PDException {
         }
     }
 
+    public void updateStatus(Integer id, MetaTask.TaskState state) {
+        statusMap.compute(id, (key, statusInfo) -> {
+            if (statusInfo == null) {
+                statusInfo = new StatusInfo();
+            }
+            if (state == MetaTask.TaskState.Task_Success) {
+                int count = statusInfo.successCount.incrementAndGet();
+                if (count >= 3) {
+                    statusInfo.state = MetaTask.TaskState.Task_Success;
+                }
+            }
+            else if (state == MetaTask.TaskState.Task_Failure) {
+                statusInfo.state = MetaTask.TaskState.Task_Failure;
+            }
+            return statusInfo;
+        });
+    }
+
+
+    public synchronized void handleBulkloadTask(MetaTask.Task task) throws 
PDException {
+        log.info("handle report bulkload task, graph:{}, pid : {}, task: {}",
+                 task.getPartition().getGraphName(), 
task.getPartition().getId(), task);
+
+        var taskInfoMeta = storeService.getTaskInfoMeta();
+        var partition = task.getPartition();
+        MetaTask.Task pdMetaTask =
+                taskInfoMeta.getBulkloadTask(partition.getGraphName(), 
partition.getId());
+
+        MetaTask.TaskState state = task.getState();
+
+        log.info("report bulkload task, graph:{}, pid : {}, state: {}",
+                 task.getPartition().getGraphName(),
+                 task.getPartition().getId(),
+                 task.getState());
+
+        updateStatus(partition.getId(), state);
+
+        if (statusMap.get(partition.getId()).state != null &&
+            statusMap.get(partition.getId()).state != 
MetaTask.TaskState.Task_Ready) {
+            var newTask =
+                    
pdMetaTask.toBuilder().setState(statusMap.get(partition.getId()).state).build();
+

Review Comment:
   [nitpick] Store the result of 'statusMap.get(partition.getId())' in a local 
variable to avoid repeated lookups and to improve code readability.
   ```suggestion
           var partitionStatus = statusMap.get(partition.getId());
           if (partitionStatus.state != null &&
               partitionStatus.state != MetaTask.TaskState.Task_Ready) {
               var newTask =
                       
pdMetaTask.toBuilder().setState(partitionStatus.state).build();
   ```



##########
hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/TaskAPI.java:
##########
@@ -89,6 +105,34 @@ public String splitPartitions() {
         }
     }
 
+    @PostMapping(value = "/bulkload")
+    public Map<String,String> bulkload(@RequestBody BulkloadRestRequest 
request) throws PDException {
+        if (isLeader()) {
+            return handleBulkload(request);
+        } else {
+            String leaderAddress = 
RaftEngine.getInstance().getLeader().getIp();
+            String url = "http://"; + leaderAddress+":8620" + 
"/v1/task/bulkload";

Review Comment:
   [nitpick] Consider extracting the hardcoded port value into a constant or 
configuration property to simplify future updates and improve maintainability.
   ```suggestion
               String url = "http://"; + leaderAddress + ":" + LEADER_PORT + 
"/v1/task/bulkload";
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to