This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 1c1fd4af347 [fix][broker] Fix compaction horizon might be reset to an 
old position when phase two is interrupted (#25119)
1c1fd4af347 is described below

commit 1c1fd4af3476c676c29d3b4831f64ee63a4c2a16
Author: Yunze Xu <[email protected]>
AuthorDate: Sun Jan 4 19:03:56 2026 +0800

    [fix][broker] Fix compaction horizon might be reset to an old position when 
phase two is interrupted (#25119)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   8 +-
 .../compaction/AbstractTwoPhaseCompactor.java      |   6 +-
 .../apache/pulsar/compaction/CompactionTest.java   | 140 ++++++++++++++++-----
 .../pulsar/compaction/StrategicCompactionTest.java |  45 ++++---
 4 files changed, 150 insertions(+), 49 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index ab15a6d6a17..b3cb8dc4596 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1572,7 +1572,12 @@ public class ManagedCursorImpl implements ManagedCursor {
 
         final AsyncCallbacks.ResetCursorCallback callback = 
resetCursorCallback;
 
-        final Position newMarkDeletePosition = 
ledger.getPreviousPosition(newReadPosition);
+        final Position newMarkDeletePosition;
+        if (isCompactionCursor()) {
+            newMarkDeletePosition = markDeletePosition;
+        } else {
+            newMarkDeletePosition = 
ledger.getPreviousPosition(newReadPosition);
+        }
 
         Runnable alignAcknowledgeStatusAfterPersisted = () -> {
             // Correct the variable "messagesConsumedCounter".
@@ -1662,7 +1667,6 @@ public class ManagedCursorImpl implements ManagedCursor {
 
         persistentMarkDeletePosition = null;
         inProgressMarkDeletePersistPosition = null;
-        lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, 
getProperties(), null, null);
         internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ? 
getProperties() : Collections.emptyMap(),
                 new MarkDeleteCallback() {
             @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
index ddfe8825a88..7aba181cb44 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.compaction;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.time.Duration;
@@ -59,6 +60,8 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class AbstractTwoPhaseCompactor<T> extends Compactor {
 
+  @VisibleForTesting
+  static Runnable injectionAfterSeekInPhaseTwo = () -> {};
   private static final Logger log = 
LoggerFactory.getLogger(AbstractTwoPhaseCompactor.class);
   protected static final int MAX_OUTSTANDING = 500;
   protected final Duration phaseOneLoopReadTimeout;
@@ -188,6 +191,7 @@ public abstract class AbstractTwoPhaseCompactor<T> extends 
Compactor {
     CompletableFuture<Long> promise = new CompletableFuture<>();
 
     reader.seekAsync(from).thenCompose((v) -> {
+          injectionAfterSeekInPhaseTwo.run();
           Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
           CompletableFuture<Void> loopPromise = new CompletableFuture<>();
           phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, 
loopPromise, MessageId.earliest);
@@ -436,4 +440,4 @@ public abstract class AbstractTwoPhaseCompactor<T> extends 
Compactor {
   public long getPhaseOneLoopReadTimeoutInSeconds() {
     return phaseOneLoopReadTimeout.getSeconds();
   }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 889eb2b8a35..077cf9d0b11 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -57,6 +57,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -109,7 +110,8 @@ import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -121,7 +123,13 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
     protected BookKeeper bk;
     private PublishingOrderCompactor compactor;
 
-    @BeforeMethod
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        conf.setDispatcherMaxReadBatchSize(1);
+    }
+
+    @BeforeClass
     @Override
     public void setup() throws Exception {
         super.internalSetup();
@@ -139,7 +147,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         compactor = new PublishingOrderCompactor(conf, pulsarClient, bk, 
compactionScheduler);
     }
 
-    @AfterMethod(alwaysRun = true)
+    @AfterClass(alwaysRun = true)
     @Override
     public void cleanup() throws Exception {
         super.internalCleanup();
@@ -149,6 +157,12 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    @BeforeMethod(alwaysRun = true)
+    public void beforeMethod() throws Exception {
+        admin.namespaces().removeRetention("my-tenant/my-ns");
+        AbstractTwoPhaseCompactor.injectionAfterSeekInPhaseTwo = () -> {};
+    }
+
     protected long compact(String topic) throws ExecutionException, 
InterruptedException {
         return compactor.compact(topic).get();
     }
@@ -165,7 +179,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testCompaction() throws Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/compaction";
         final int numMessages = 20;
         final int maxKeys = 10;
 
@@ -229,7 +243,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testCompactionWithReader() throws Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/compaction-with-reader";
         final int numMessages = 20;
         final int maxKeys = 10;
 
@@ -290,7 +304,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testReadCompactedBeforeCompaction() throws Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = 
"persistent://my-tenant/my-ns/read-compacted-before-compaction";
 
         Producer<byte[]> producer = pulsarClient.newProducer()
             .topic(topic)
@@ -330,7 +344,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testReadEntriesAfterCompaction() throws Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = 
"persistent://my-tenant/my-ns/read-entries-after-compaction";
 
         Producer<byte[]> producer = pulsarClient.newProducer()
             .topic(topic)
@@ -361,7 +375,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testSeekEarliestAfterCompaction() throws Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = 
"persistent://my-tenant/my-ns/seek-earliest-after-compaction";
 
         Producer<byte[]> producer = pulsarClient.newProducer()
             .topic(topic)
@@ -402,7 +416,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testBrokerRestartAfterCompaction() throws Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = 
"persistent://my-tenant/my-ns/test-restart-after-compaction";
 
         Producer<byte[]> producer = pulsarClient.newProducer()
             .topic(topic)
@@ -444,7 +458,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testCompactEmptyTopic() throws Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/compact-empty-topic";
 
         Producer<byte[]> producer = pulsarClient.newProducer()
             .topic(topic)
@@ -467,7 +481,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testFirstMessageRetained() throws Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/first-message-retained";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -508,7 +522,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testBatchMessageIdsDontChange() throws Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = 
"persistent://my-tenant/my-ns/batch-message-ids-dont-change";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -571,7 +585,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testBatchMessageWithNullValue() throws Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = 
"persistent://my-tenant/my-ns/batch-message-with-null-value";
 
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
                 .receiverQueueSize(1).readCompacted(true).subscribe().close();
@@ -625,7 +639,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testWholeBatchCompactedOut() throws Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = 
"persistent://my-tenant/my-ns/whole-batch-compacted-out";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -670,7 +684,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         restartBroker();
         FieldUtils.writeField(compactor, "topicCompactionRetainNullKey", 
retainNullKey, true);
 
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = 
"persistent://my-tenant/my-ns/key-less-messages-pass-through-" + retainNullKey;
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -737,7 +751,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testEmptyPayloadDeletes() throws Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/empty-payload-deletes";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -819,7 +833,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testEmptyPayloadDeletesWhenCompressed() throws Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = 
"persistent://my-tenant/my-ns/empty-payload-deletes-when-compressed";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -900,7 +914,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testCompactorReadsCompacted() throws Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = 
"persistent://my-tenant/my-ns/compactor-reads-compacted";
 
         // capture opened ledgers
         Set<Long> ledgersOpened = Sets.newConcurrentHashSet();
@@ -952,6 +966,12 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId));
         ledgersOpened.clear();
 
+        try (Producer<byte[]> producerNormal = 
pulsarClient.newProducer().topic(topic).create()) {
+            producerNormal.newMessage()
+                    .key("key2")
+                    .value("my-message".getBytes())
+                    .send();
+        }
         // force broker to close resources for topic
         
pulsar.getBrokerService().getTopicReference(topic).get().close(false).get();
 
@@ -1000,7 +1020,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testCompactCompressedNoBatch() throws Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = 
"persistent://my-tenant/my-ns/compact-compressed-no-batch";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1039,7 +1059,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testCompactCompressedBatching() throws Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = 
"persistent://my-tenant/my-ns/compact-compressed-batching";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1118,7 +1138,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testCompactEncryptedNoBatch() throws Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = 
"persistent://my-tenant/my-ns/compact-encrypted-no-batch";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1160,7 +1180,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testCompactEncryptedBatching() throws Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = 
"persistent://my-tenant/my-ns/compact-encrypted-batching";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1216,7 +1236,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testCompactEncryptedAndCompressedNoBatch() throws Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = 
"persistent://my-tenant/my-ns/compact-encrypted-and-compressed-no-batch";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1259,7 +1279,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testCompactEncryptedAndCompressedBatching() throws Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = 
"persistent://my-tenant/my-ns/compact-encrypted-and-compressed-batching";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1317,7 +1337,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testEmptyPayloadDeletesWhenEncrypted() throws Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = 
"persistent://my-tenant/my-ns/empty-payload-deletes-when-encrypted";
 
         // subscribe before sending anything, so that we get all messages
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1413,7 +1433,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
     public void testCompactionWithLastDeletedKey(boolean batching) throws 
Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = 
"persistent://my-tenant/my-ns/compaction-with-last-deleted-key-" + batching;
 
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).enableBatching(batching)
                 
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
@@ -1439,7 +1459,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
     public void testEmptyCompactionLedger(boolean batching) throws Exception {
-        String topic = "persistent://my-tenant/my-ns/my-topic1";
+        String topic = "persistent://my-tenant/my-ns/empty-compaction-ledger-" 
+ batching;
 
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).enableBatching(batching)
                 
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
@@ -2347,7 +2367,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(timeOut = 120 * 1000)
     public void testConcurrentCompactionAndTopicDelete() throws Exception {
-        final String topicName = 
newUniqueName("persistent://my-tenant/my-ns/tp");
+        final String topicName = 
newUniqueName("persistent://my-tenant/my-ns/concurrent-compaction-topic-delete");
         admin.topics().createNonPartitionedTopic(topicName);
         // Load up the topic.
         Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
@@ -2460,4 +2480,68 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
         assertEquals(results, expected);
     }
+
+    @Test
+    public void testPhaseTwoInterruption() throws Exception {
+        // Set infinite retention to retain all original ledgers
+        admin.namespaces().setRetention("my-tenant/my-ns", new 
RetentionPolicies(-1, -1));
+        final var topic = 
"persistent://my-tenant/my-ns/phase-two-interruption";
+        @Cleanup final var producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        final BiConsumer<String, String> send = (key, value) -> {
+            final var msgId = 
producer.newMessage().key(key).value(value).sendAsync().join();
+            log.info("Sent {} => {} to {}", key, value, msgId);
+        };
+
+        send.accept("key-0", "value");
+        for (int i = 0; i < 3; i++) {
+            send.accept("key-1", "value-" + i);
+        }
+
+        triggerAndWaitCompaction(topic); // update the compaction horizon
+
+        AbstractTwoPhaseCompactor.injectionAfterSeekInPhaseTwo = () -> {
+            // Simulate the case when the topic is closed during compaction 
phase two
+            CompletableFuture.runAsync(() -> {
+                final var persistentTopic = (PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(topic).join()
+                        .orElseThrow();
+                persistentTopic.close().join();
+            });
+        };
+        // Send a new message so that the compaction won't be skipped
+        send.accept("key-2", "value-0");
+        send.accept("key-2", "value-1");
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().untilAsserted(() -> 
assertFalse(pulsar.getBrokerService().getTopics()
+                .containsKey(TopicName.get(topic).toString())));
+
+        AbstractTwoPhaseCompactor.injectionAfterSeekInPhaseTwo = () -> {};
+
+        // Messages of "key-2" are not compacted due to the injected failure, 
but the previous messages are read from
+        // the compacted ledger rather than the original ledger.
+        verifyReadKeyValues(topic, true, List.of("key-0", "value", "key-1", 
"value-2", "key-2", "value-0", "key-2",
+                "value-1"));
+        // The original ledger still exists so old values of "key-1" can be 
read
+        verifyReadKeyValues(topic, false, List.of("key-0", "value", "key-1", 
"value-0", "key-1", "value-1", "key-1",
+                "value-2", "key-2", "value-0", "key-2", "value-1"));
+    }
+
+    private void verifyReadKeyValues(String topic, boolean readCompacted, 
List<String> expectedKeyValues)
+            throws Exception {
+        @Cleanup final var reader = 
pulsarClient.newReader(Schema.STRING).topic(topic).readCompacted(readCompacted)
+                .startMessageId(MessageId.earliest).create();
+        final var keyValues = new ArrayList<String>();
+        while (reader.hasMessageAvailable()) {
+            final var msg = reader.readNext();
+            keyValues.add(msg.getKey());
+            keyValues.add(msg.getValue());
+        }
+        assertEquals(keyValues, expectedKeyValues,
+                readCompacted + " " + String.join(",", keyValues.toArray(new 
String[0])));
+    }
+
+    private void triggerAndWaitCompaction(String topic) throws Exception {
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().untilAsserted(() -> assertEquals(
+                admin.topics().compactionStatus(topic).status, 
LongRunningProcessStatus.Status.SUCCESS));
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
index 4cdd195d493..2ca03c55b30 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
@@ -20,18 +20,23 @@ package org.apache.pulsar.compaction;
 
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewImpl.MSG_COMPRESSION_TYPE;
 import static org.testng.Assert.assertEquals;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageIdAdv;
@@ -45,45 +50,49 @@ import 
org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.topics.TopicCompactionStrategy;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Test(groups = "flaky")
-public class StrategicCompactionTest extends CompactionTest {
+public class StrategicCompactionTest extends MockedPulsarServiceBaseTest {
+
+    protected ScheduledExecutorService compactionScheduler;
+    protected BookKeeper bk;
     private TopicCompactionStrategy strategy;
     private StrategicTwoPhaseCompactor compactor;
 
-    @BeforeMethod
+    @BeforeClass
     @Override
     public void setup() throws Exception {
-        super.setup();
+        super.internalSetup();
+        compactionScheduler = Executors.newSingleThreadScheduledExecutor(
+                new 
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
+        bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, 
Optional.empty(), null).get();
         compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
         strategy = new 
TopicCompactionStrategyTest.DummyTopicCompactionStrategy();
     }
 
+    @AfterClass(alwaysRun = true)
     @Override
-    protected long compact(String topic) throws ExecutionException, 
InterruptedException {
-        return (long) compactor.compact(topic, strategy).get();
-    }
-
-    @Override
-    protected long compact(String topic, CryptoKeyReader cryptoKeyReader)
-            throws ExecutionException, InterruptedException {
-        return (long) compactor.compact(topic, strategy, 
cryptoKeyReader).get();
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+        bk.close();
+        if (compactionScheduler != null) {
+            compactionScheduler.shutdownNow();
+        }
     }
 
-    @Override
-    protected PublishingOrderCompactor getCompactor() {
-        return compactor;
+    private long compact(String topic) throws ExecutionException, 
InterruptedException {
+        return (long) compactor.compact(topic, strategy).get();
     }
 
-
     @Test
     public void testNumericOrderCompaction() throws Exception {
 
         strategy = new NumericOrderCompactionStrategy();
 
-        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        String topic = 
"persistent://my-property/use/my-ns/numeric-order-compaction";
         final int numMessages = 50;
         final int maxKeys = 5;
 

Reply via email to