This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a0372f4b91b [fix][broker] Fix IndexOutOfBoundsException in the 
CompactedTopicUtils (#20887)
a0372f4b91b is described below

commit a0372f4b91b4bd6600093cdb52e2db5d6ab8497e
Author: Cong Zhao <[email protected]>
AuthorDate: Thu Jul 27 00:49:46 2023 +0800

    [fix][broker] Fix IndexOutOfBoundsException in the CompactedTopicUtils 
(#20887)
---
 .../pulsar/compaction/CompactedTopicUtils.java     |  1 +
 .../pulsar/compaction/CompactedTopicUtilsTest.java | 79 ++++++++++++++++++++++
 2 files changed, 80 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
index cc5147c8e66..6acd33279fd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
@@ -85,6 +85,7 @@ public class CompactedTopicUtils {
                             }
                             cursor.seek(seekToPosition);
                             
callback.readEntriesComplete(Collections.emptyList(), readEntriesCtx);
+                            return;
                         }
 
                         Entry lastEntry = entries.get(entries.size() - 1);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
new file mode 100644
index 00000000000..329abf9f780
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class CompactedTopicUtilsTest {
+
+    @Test
+    public void testReadCompactedEntriesWithEmptyEntries() throws 
ExecutionException, InterruptedException {
+        PositionImpl lastCompactedPosition = PositionImpl.get(1, 100);
+        TopicCompactionService service = 
Mockito.mock(TopicCompactionService.class);
+        
Mockito.doReturn(CompletableFuture.completedFuture(Collections.emptyList()))
+                .when(service).readCompactedEntries(Mockito.any(), 
Mockito.intThat(argument -> argument > 0));
+        
Mockito.doReturn(CompletableFuture.completedFuture(lastCompactedPosition)).when(service)
+                .getLastCompactedPosition();
+
+
+        PositionImpl initPosition = PositionImpl.get(1, 90);
+        AtomicReference<PositionImpl> readPositionRef = new 
AtomicReference<>(initPosition.getNext());
+        ManagedCursor cursor = Mockito.mock(ManagedCursor.class);
+        Mockito.doReturn(readPositionRef.get()).when(cursor).getReadPosition();
+        Mockito.doAnswer(invocation -> {
+            readPositionRef.set(invocation.getArgument(0));
+            return null;
+        }).when(cursor).seek(Mockito.any());
+
+        CompletableFuture<List<Entry>> completableFuture = new 
CompletableFuture<>();
+        final AtomicReference<Throwable> throwableRef = new 
AtomicReference<>();
+        AsyncCallbacks.ReadEntriesCallback readEntriesCallback = new 
AsyncCallbacks.ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> entries, Object ctx) {
+                completableFuture.complete(entries);
+            }
+
+            @Override
+            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
+                completableFuture.completeExceptionally(exception);
+                throwableRef.set(exception);
+            }
+        };
+
+        CompactedTopicUtils.asyncReadCompactedEntries(service, cursor, 1, 100, 
false,
+                readEntriesCallback, false, null);
+
+        List<Entry> entries = completableFuture.get();
+        Assert.assertTrue(entries.isEmpty());
+        Assert.assertNull(throwableRef.get());
+        Assert.assertEquals(readPositionRef.get(), 
lastCompactedPosition.getNext());
+    }
+}

Reply via email to