lollipopjin commented on code in PR #10519:
URL: https://github.com/apache/rocketmq/pull/10519#discussion_r3452352655


##########
broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java:
##########
@@ -186,6 +206,248 @@ public CompletableFuture<RemotingCommand> 
processRequestAsync(final Channel chan
         });
     }
 
+    protected CompletableFuture<RemotingCommand> 
processBatchRequestAsync(final Channel channel,
+        RemotingCommand request, boolean brokerAllowSuspend) {
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setOpaque(request.getOpaque());
+
+        BatchChangeInvisibleTimeRequestHeader batchRequestHeader;
+        try {
+            batchRequestHeader = (BatchChangeInvisibleTimeRequestHeader)
+                
request.decodeCommandCustomHeader(BatchChangeInvisibleTimeRequestHeader.class);
+        } catch (Throwable t) {
+            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+            response.setRemark("batch change invisible time request header is 
invalid");
+            return CompletableFuture.completedFuture(response);
+        }
+
+        BatchChangeInvisibleTimeRequestBody requestBody =
+            BatchChangeInvisibleTimeRequestBody.decode(request.getBody(), 
BatchChangeInvisibleTimeRequestBody.class);
+        List<ChangeInvisibleTimeRequestEntry> requestEntries = requestBody == 
null || requestBody.getEntries() == null ?
+            Collections.emptyList() : requestBody.getEntries();
+        int batchMaxNum = Math.max(1, 
brokerController.getBrokerConfig().getBatchChangeInvisibleTimeMaxNum());
+        if (requestEntries.size() > batchMaxNum) {
+            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+            response.setRemark(String.format("batch change invisible time 
entries exceed limit: %d",
+                batchMaxNum));
+            return CompletableFuture.completedFuture(response);
+        }
+
+        ChangeInvisibleTimeResponseEntry[] responseEntries = new 
ChangeInvisibleTimeResponseEntry[requestEntries.size()];
+        for (int i = 0; i < requestEntries.size(); i++) {
+            responseEntries[i] = 
buildFailedResponseEntry(ResponseCode.SYSTEM_ERROR);
+        }
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        if (!validateBatchRequestEntries(batchRequestHeader, requestEntries)) {
+            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+            response.setRemark("batch change invisible time entries must use 
the same topic and consumerGroup as request header");
+            return CompletableFuture.completedFuture(response);
+        }
+
+        if (brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) 
{
+            List<ChangeInvisibleTimeRequestEntry> kvChangeRecords = new 
ArrayList<>();
+            List<Integer> kvIndexes = new ArrayList<>();
+            List<ChangeInvisibleTimeResponseEntry> kvSuccessEntries = new 
ArrayList<>();
+            for (int i = 0; i < requestEntries.size(); i++) {
+                ChangeInvisibleTimeRequestEntry requestEntry = 
requestEntries.get(i);
+                if (tryAppendBatchKvChange(channel, requestEntry, i, 
responseEntries, kvChangeRecords,
+                    kvIndexes, kvSuccessEntries)) {
+                    continue;
+                }
+                appendSingleBatchEntry(channel, requestEntry, 
request.getOpaque(), brokerAllowSuspend,
+                    responseEntries, futures, i);
+            }
+
+            if (!kvChangeRecords.isEmpty()) {
+                try {
+                    
brokerController.getPopConsumerService().batchChangeInvisibilityDuration(kvChangeRecords);
+                    for (int i = 0; i < kvIndexes.size(); i++) {
+                        responseEntries[kvIndexes.get(i)] = 
kvSuccessEntries.get(i);
+                    }
+                } catch (Throwable t) {
+                    POP_LOGGER.error("batch change invisibility duration 
failed", t);
+                    for (Integer index : kvIndexes) {
+                        responseEntries[index] = 
buildFailedResponseEntry(ResponseCode.SYSTEM_ERROR);
+                    }
+                }
+            }
+        } else {
+            for (int i = 0; i < requestEntries.size(); i++) {
+                appendSingleBatchEntry(channel, requestEntries.get(i), 
request.getOpaque(), brokerAllowSuspend,
+                    responseEntries, futures, i);
+            }
+        }
+
+        return CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0])).thenApply(ignored -> {
+            BatchChangeInvisibleTimeResponseBody responseBody = new 
BatchChangeInvisibleTimeResponseBody();
+            responseBody.setEntries(Arrays.asList(responseEntries));
+            response.setBody(responseBody.encode());
+            return response;
+        });
+    }
+
+    protected boolean 
validateBatchRequestEntries(BatchChangeInvisibleTimeRequestHeader 
batchRequestHeader,

Review Comment:
   How about renaming it to normalizeAndValidateBatchRequestEntries to reflect 
that it mutates the input requestEntries (back-filling consumerGroup / topic 
from the header)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to