Copilot commented on code in PR #23907:
URL: https://github.com/apache/pulsar/pull/23907#discussion_r2498915149


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java:
##########
@@ -468,6 +472,71 @@ public void 
testDeletePartitionedTopicIfCursorPropsNotEmpty(SubscriptionType sub
         admin.topics().deletePartitionedTopic(topic);
     }
 
+    @Test
+    public void testDelayedMessageCancel() throws Exception {
+        String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/testDelayedMessageCancel");
+        final String subName = "shared-sub";
+        CountDownLatch latch = new CountDownLatch(99);
+        admin.topics().createPartitionedTopic(topic, 2);
+        Set<String> receivedMessages1 = ConcurrentHashMap.newKeySet();
+        Set<String> receivedMessages2 = ConcurrentHashMap.newKeySet();
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName(subName + "-1")
+                .subscriptionType(SubscriptionType.Shared)
+                .messageListener((Consumer<String> c, Message<String> msg) -> {
+                    receivedMessages1.add(msg.getValue());
+                    c.acknowledgeAsync(msg);
+                    latch.countDown();
+                })
+                .subscribe();
+
+        @Cleanup
+        Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName(subName + "-2")
+                .subscriptionType(SubscriptionType.Shared)
+                .messageListener((Consumer<String> c, Message<String> msg) -> {
+                    receivedMessages2.add(msg.getValue());
+                    c.acknowledgeAsync(msg);
+                    latch.countDown();
+                })
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        List<MessageId> messageIds = new ArrayList<>();
+
+        for (int i = 0; i < 100; i++) {
+            final long deliverAtTime = System.currentTimeMillis() + 3000L;
+            MessageId messageId = producer.newMessage()
+                    .key(String.valueOf(i))
+                    .value("msg-" + i)
+                    .deliverAt(deliverAtTime)
+                    .send();
+            messageIds.add(i, messageId);
+        }
+
+        final int cancelMessage = 50;
+        MessageIdImpl messageId = (MessageIdImpl) 
messageIds.get(cancelMessage);
+
+        Map<String,String> ackMessageIds = new HashMap<>();
+        ackMessageIds.put(String.valueOf(messageId.getLedgerId()), 
String.valueOf(messageId.getEntryId()));
+
+        admin.topics().skipMessages(topic + "-partition-0", subName + "-1", 
ackMessageIds);
+        admin.topics().skipMessages(topic + "-partition-1", subName + "-1", 
ackMessageIds);
+
+        assertTrue(latch.await(15, TimeUnit.SECONDS), "Not all messages were 
received in time");
+        assertFalse((receivedMessages1.contains("msg-" + cancelMessage)
+                        || receivedMessages2.contains("msg-" + cancelMessage))

Review Comment:
   The assertion logic is incorrect. The condition `assertFalse((A || B) && C)` 
will pass (return true for assertFalse) when either: (1) the message WAS 
received, OR (2) the total count is NOT 99. This is the opposite of the 
intended behavior. The assertion should verify that the message was NOT 
received AND the count is 99. Consider: 
`assertTrue(!receivedMessages1.contains(\"msg-\" + cancelMessage) && 
!receivedMessages2.contains(\"msg-\" + cancelMessage) && 
(receivedMessages1.size() + receivedMessages2.size() == 99))`
   ```suggestion
           assertTrue(
                   !receivedMessages1.contains("msg-" + cancelMessage)
                           && !receivedMessages2.contains("msg-" + 
cancelMessage)
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -23,6 +23,8 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import io.netty.buffer.ByteBuf;
+import java.io.IOException;

Review Comment:
   The import `java.io.IOException` is not used in this file and should be 
removed.
   ```suggestion
   
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java:
##########
@@ -1595,6 +1595,42 @@ public void skipMessages(
         }
     }
 
+    @POST
+    
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/skipByMessageIds")
+    @ApiOperation(value = "Skipping messages on a topic subscription.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 204, message = "Operation successful"),
+            @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to 
administrate resources on this tenant"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace or topic or 
subscription does not exist"),
+            @ApiResponse(code = 405, message = "Skipping messages on a 
partitioned topic is not allowed"),

Review Comment:
   The API response documentation at line 1607 states 'Skipping messages on a 
partitioned topic is not allowed', but the implementation in 
`internalSkipByMessageIds` (lines 1962-1972) actually supports partitioned 
topics by iterating through partitions. This documentation is misleading and 
should either be removed or updated to reflect that partitioned topics are 
supported.
   ```suggestion
   
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -1948,6 +1947,109 @@ protected void internalSkipMessages(AsyncResponse 
asyncResponse, String subName,
         });
     }
 
+    protected void internalSkipByMessageIds(AsyncResponse asyncResponse, 
String subName, boolean authoritative,
+                                            Map<String, String> messageIds) {
+        CompletableFuture<Void> validationFuture = 
validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName);
+        validationFuture = validationFuture.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        });
+        validationFuture.thenCompose(__ -> 
getPartitionedTopicMetadataAsync(topicName, authoritative, false))
+                .thenAccept(partitionMetadata -> {
+                    if (topicName.isPartitioned()) {
+                        
internalSkipByMessageIdsForNonPartitionedTopic(asyncResponse, messageIds,
+                                subName, authoritative);
+                    } else {
+                        if (partitionMetadata.partitions > 0) {
+                            
internalSkipByMessageIdsForPartitionedTopic(asyncResponse, partitionMetadata,
+                                    messageIds, subName);
+                        } else {
+                            
internalSkipByMessageIdsForNonPartitionedTopic(asyncResponse, messageIds,
+                                    subName, authoritative);
+                        }
+                    }
+                }).exceptionally(ex -> {
+                    if (isNot307And404Exception(ex)) {
+                        log.error("[{}] Failed to ack messages on topic {}: 
{}", clientAppId(), topicName, ex);

Review Comment:
   The log message says 'Failed to ack messages' but this is in the context of 
skipping messages by message IDs. The message should be updated to 'Failed to 
skip messages by message IDs on topic {}: {}' to accurately reflect the 
operation being performed.
   ```suggestion
                           log.error("[{}] Failed to skip messages by message 
IDs on topic {}: {}", clientAppId(), topicName, ex);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -1948,6 +1947,109 @@ protected void internalSkipMessages(AsyncResponse 
asyncResponse, String subName,
         });
     }
 
+    protected void internalSkipByMessageIds(AsyncResponse asyncResponse, 
String subName, boolean authoritative,
+                                            Map<String, String> messageIds) {
+        CompletableFuture<Void> validationFuture = 
validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName);
+        validationFuture = validationFuture.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        });
+        validationFuture.thenCompose(__ -> 
getPartitionedTopicMetadataAsync(topicName, authoritative, false))
+                .thenAccept(partitionMetadata -> {
+                    if (topicName.isPartitioned()) {
+                        
internalSkipByMessageIdsForNonPartitionedTopic(asyncResponse, messageIds,
+                                subName, authoritative);
+                    } else {
+                        if (partitionMetadata.partitions > 0) {
+                            
internalSkipByMessageIdsForPartitionedTopic(asyncResponse, partitionMetadata,
+                                    messageIds, subName);
+                        } else {
+                            
internalSkipByMessageIdsForNonPartitionedTopic(asyncResponse, messageIds,
+                                    subName, authoritative);
+                        }
+                    }
+                }).exceptionally(ex -> {
+                    if (isNot307And404Exception(ex)) {
+                        log.error("[{}] Failed to ack messages on topic {}: 
{}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
+    private void internalSkipByMessageIdsForPartitionedTopic(AsyncResponse 
asyncResponse,
+                                                             
PartitionedTopicMetadata partitionMetadata,
+                                                             Map<String, 
String> messageIds,
+                                                             String subName) {
+        final List<CompletableFuture<Void>> futures = new 
ArrayList<>(partitionMetadata.partitions);
+        PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (PulsarServerException e) {
+            asyncResponse.resume(new RestException(e));
+            return;
+        }
+        for (int i = 0; i < partitionMetadata.partitions; i++) {
+            TopicName topicNamePartition = topicName.getPartition(i);
+            futures.add(admin
+                    .topics()
+                    .skipMessagesAsync(topicNamePartition.toString(), subName, 
messageIds));
+        }
+        FutureUtil.waitForAll(futures).handle((result, exception) -> {
+            if (exception != null) {
+                Throwable t = FutureUtil.unwrapCompletionException(exception);
+                log.warn("[{}] Failed to ack messages on some partitions of 
{}: {}",

Review Comment:
   The log message says 'Failed to ack messages' but should say 'Failed to skip 
messages by message IDs' to match the actual operation being performed in this 
context.
   ```suggestion
                   log.warn("[{}] Failed to skip messages by message IDs on 
some partitions of {}: {}",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to