This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a0372f4b91b [fix][broker] Fix IndexOutOfBoundsException in the
CompactedTopicUtils (#20887)
a0372f4b91b is described below
commit a0372f4b91b4bd6600093cdb52e2db5d6ab8497e
Author: Cong Zhao <[email protected]>
AuthorDate: Thu Jul 27 00:49:46 2023 +0800
[fix][broker] Fix IndexOutOfBoundsException in the CompactedTopicUtils
(#20887)
---
.../pulsar/compaction/CompactedTopicUtils.java | 1 +
.../pulsar/compaction/CompactedTopicUtilsTest.java | 79 ++++++++++++++++++++++
2 files changed, 80 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
index cc5147c8e66..6acd33279fd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
@@ -85,6 +85,7 @@ public class CompactedTopicUtils {
}
cursor.seek(seekToPosition);
callback.readEntriesComplete(Collections.emptyList(), readEntriesCtx);
+ return;
}
Entry lastEntry = entries.get(entries.size() - 1);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
new file mode 100644
index 00000000000..329abf9f780
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class CompactedTopicUtilsTest {
+
+ @Test
+ public void testReadCompactedEntriesWithEmptyEntries() throws
ExecutionException, InterruptedException {
+ PositionImpl lastCompactedPosition = PositionImpl.get(1, 100);
+ TopicCompactionService service =
Mockito.mock(TopicCompactionService.class);
+
Mockito.doReturn(CompletableFuture.completedFuture(Collections.emptyList()))
+ .when(service).readCompactedEntries(Mockito.any(),
Mockito.intThat(argument -> argument > 0));
+
Mockito.doReturn(CompletableFuture.completedFuture(lastCompactedPosition)).when(service)
+ .getLastCompactedPosition();
+
+
+ PositionImpl initPosition = PositionImpl.get(1, 90);
+ AtomicReference<PositionImpl> readPositionRef = new
AtomicReference<>(initPosition.getNext());
+ ManagedCursor cursor = Mockito.mock(ManagedCursor.class);
+ Mockito.doReturn(readPositionRef.get()).when(cursor).getReadPosition();
+ Mockito.doAnswer(invocation -> {
+ readPositionRef.set(invocation.getArgument(0));
+ return null;
+ }).when(cursor).seek(Mockito.any());
+
+ CompletableFuture<List<Entry>> completableFuture = new
CompletableFuture<>();
+ final AtomicReference<Throwable> throwableRef = new
AtomicReference<>();
+ AsyncCallbacks.ReadEntriesCallback readEntriesCallback = new
AsyncCallbacks.ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object ctx) {
+ completableFuture.complete(entries);
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception,
Object ctx) {
+ completableFuture.completeExceptionally(exception);
+ throwableRef.set(exception);
+ }
+ };
+
+ CompactedTopicUtils.asyncReadCompactedEntries(service, cursor, 1, 100,
false,
+ readEntriesCallback, false, null);
+
+ List<Entry> entries = completableFuture.get();
+ Assert.assertTrue(entries.isEmpty());
+ Assert.assertNull(throwableRef.get());
+ Assert.assertEquals(readPositionRef.get(),
lastCompactedPosition.getNext());
+ }
+}