This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c82825bb9b8 [feat][broker][PIP-278] Support pluggable topic compaction 
service - part1 (#20645)
c82825bb9b8 is described below

commit c82825bb9b8ef98e2b120bcdd65a7d5ddb134549
Author: Cong Zhao <[email protected]>
AuthorDate: Fri Jun 30 16:35:47 2023 +0800

    [feat][broker][PIP-278] Support pluggable topic compaction service - part1 
(#20645)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |   6 ++
 .../org/apache/pulsar/broker/PulsarService.java    |   2 +-
 .../broker/service/AbstractBaseDispatcher.java     |   4 +-
 .../broker/service/persistent/PersistentTopic.java |   2 +-
 ...ption.java => PulsarCompactorSubscription.java} |   8 +-
 .../pulsar/compaction/CompactedTopicImpl.java      |  10 +-
 .../compaction/CompactionServiceFactory.java       |  48 +++++++++
 .../compaction/PulsarCompactionServiceFactory.java |  84 +++++++++++++++
 .../compaction/PulsarTopicCompactionService.java   | 111 +++++++++++++++++++
 .../pulsar/compaction/TopicCompactionService.java  |  63 +++++++++++
 .../pulsar/broker/service/PersistentTopicTest.java |   6 +-
 .../compaction/TopicCompactionServiceTest.java     | 118 +++++++++++++++++++++
 12 files changed, 449 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 9398d349be7..b41a562fbd7 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3166,6 +3166,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private int transactionPendingAckBatchedWriteMaxDelayInMillis = 1;
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "The class name of the factory that implements the topic 
compaction service."
+    )
+    private String compactionServiceFactoryClassName = 
"org.apache.pulsar.compaction.PulsarCompactionServiceFactory";
+
     /**** --- KeyStore TLS config variables. --- ****/
     @FieldContext(
             category = CATEGORY_KEYSTORE_TLS,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 00dfad40191..2b64a8cfb5c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1470,7 +1470,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         return bkClientFactory;
     }
 
-    protected synchronized ScheduledExecutorService getCompactorExecutor() {
+    public synchronized ScheduledExecutorService getCompactorExecutor() {
         if (this.compactorExecutor == null) {
             compactorExecutor = Executors.newSingleThreadScheduledExecutor(
                     new ExecutorProvider.ExtendedThreadFactory("compaction"));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 8f6caa7a208..437a6527e85 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -37,10 +37,10 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
-import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription;
 import org.apache.pulsar.broker.service.plugin.EntryFilter;
 import 
org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
 import org.apache.pulsar.client.api.transaction.TxnID;
@@ -301,7 +301,7 @@ public abstract class AbstractBaseDispatcher extends 
EntryFilterSupport implemen
     }
 
     private void individualAcknowledgeMessageIfNeeded(Position position, 
Map<String, Long> properties) {
-        if (!(subscription instanceof CompactorSubscription)) {
+        if (!(subscription instanceof PulsarCompactorSubscription)) {
             
subscription.acknowledgeMessage(Collections.singletonList(position), 
AckType.Individual, properties);
         }
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2a0c229daf6..12691d1c677 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -483,7 +483,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             boolean replicated, Map<String, String> subscriptionProperties) {
         Objects.requireNonNull(compactedTopic);
         if (isCompactionSubscription(subscriptionName)) {
-            return new CompactorSubscription(this, compactedTopic, 
subscriptionName, cursor);
+            return new PulsarCompactorSubscription(this, compactedTopic, 
subscriptionName, cursor);
         } else {
             return new PersistentSubscription(this, subscriptionName, cursor, 
replicated, subscriptionProperties);
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java
similarity index 93%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
rename to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java
index ec34aeffbec..dbb09f6ac39 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java
@@ -32,11 +32,11 @@ import org.apache.pulsar.compaction.Compactor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class CompactorSubscription extends PersistentSubscription {
+public class PulsarCompactorSubscription extends PersistentSubscription {
     private final CompactedTopic compactedTopic;
 
-    public CompactorSubscription(PersistentTopic topic, CompactedTopic 
compactedTopic,
-                                 String subscriptionName, ManagedCursor 
cursor) {
+    public PulsarCompactorSubscription(PersistentTopic topic, CompactedTopic 
compactedTopic,
+                                       String subscriptionName, ManagedCursor 
cursor) {
         super(topic, subscriptionName, cursor, false);
         
checkArgument(subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION));
         this.compactedTopic = compactedTopic;
@@ -106,5 +106,5 @@ public class CompactorSubscription extends 
PersistentSubscription {
         }
     }
 
-    private static final Logger log = 
LoggerFactory.getLogger(CompactorSubscription.class);
+    private static final Logger log = 
LoggerFactory.getLogger(PulsarCompactorSubscription.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index c8114f9adb6..e2d3de9c19a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -32,6 +32,7 @@ import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
@@ -60,7 +61,7 @@ public class CompactedTopicImpl implements CompactedTopic {
     private final BookKeeper bk;
 
     private PositionImpl compactionHorizon = null;
-    private CompletableFuture<CompactedTopicContext> compactedTopicContext = 
null;
+    private volatile CompletableFuture<CompactedTopicContext> 
compactedTopicContext = null;
 
     public CompactedTopicImpl(BookKeeper bk) {
         this.bk = bk;
@@ -258,7 +259,7 @@ public class CompactedTopicImpl implements CompactedTopic {
         return promise;
     }
 
-    private static CompletableFuture<List<Entry>> readEntries(LedgerHandle lh, 
long from, long to) {
+    static CompletableFuture<List<Entry>> readEntries(LedgerHandle lh, long 
from, long to) {
         CompletableFuture<Enumeration<LedgerEntry>> promise = new 
CompletableFuture<>();
 
         lh.asyncReadEntries(from, to,
@@ -320,6 +321,11 @@ public class CompactedTopicImpl implements CompactedTopic {
     public synchronized Optional<Position> getCompactionHorizon() {
         return Optional.ofNullable(this.compactionHorizon);
     }
+
+    @Nullable
+    public CompletableFuture<CompactedTopicContext> 
getCompactedTopicContextFuture() {
+        return compactedTopicContext;
+    }
     private static final Logger log = 
LoggerFactory.getLogger(CompactedTopicImpl.class);
 }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionServiceFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionServiceFactory.java
new file mode 100644
index 00000000000..de1abfbea95
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionServiceFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import com.google.common.annotations.Beta;
+import java.util.concurrent.CompletableFuture;
+import javax.annotation.Nonnull;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+
+@Beta
[email protected]
+public interface CompactionServiceFactory extends AutoCloseable {
+
+    /**
+     * Initialize the compaction service factory.
+     *
+     * @param pulsarService
+     *            the pulsar service instance
+     * @return a future represents the initialization result
+     */
+    CompletableFuture<Void> initialize(@Nonnull PulsarService pulsarService);
+
+    /**
+     * Create a new topic compaction service for topic.
+     *
+     * @param topic
+     *            the topic name
+     * @return a future represents the topic compaction service
+     */
+    CompletableFuture<TopicCompactionService> 
newTopicCompactionService(@Nonnull String topic);
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
new file mode 100644
index 00000000000..dd817ca35f1
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+
+public class PulsarCompactionServiceFactory implements 
CompactionServiceFactory {
+
+    private PulsarService pulsarService;
+
+    private volatile Compactor compactor;
+
+    @VisibleForTesting
+    public Compactor getCompactor() throws PulsarServerException {
+        if (compactor == null) {
+            synchronized (this) {
+                if (compactor == null) {
+                    compactor = newCompactor();
+                }
+            }
+        }
+        return compactor;
+    }
+
+    @Nullable
+    public Compactor getNullableCompactor() {
+        return compactor;
+    }
+
+    protected Compactor newCompactor() throws PulsarServerException {
+        return new TwoPhaseCompactor(pulsarService.getConfiguration(),
+                pulsarService.getClient(), pulsarService.getBookKeeperClient(),
+                pulsarService.getCompactorExecutor());
+    }
+
+    @Override
+    public CompletableFuture<Void> initialize(@Nonnull PulsarService 
pulsarService) {
+        Objects.requireNonNull(pulsarService);
+        this.pulsarService = pulsarService;
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<TopicCompactionService> 
newTopicCompactionService(@Nonnull String topic) {
+        Objects.requireNonNull(topic);
+        PulsarTopicCompactionService pulsarTopicCompactionService =
+                new PulsarTopicCompactionService(topic, 
pulsarService.getBookKeeperClient(), () -> {
+                    try {
+                        return this.getCompactor();
+                    } catch (Throwable e) {
+                        throw new CompletionException(e);
+                    }
+                });
+        return CompletableFuture.completedFuture(pulsarTopicCompactionService);
+    }
+
+    @Override
+    public void close() throws Exception {
+        // noop
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
new file mode 100644
index 00000000000..0a8bf9d69a2
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.pulsar.compaction.CompactedTopicImpl.COMPACT_LEDGER_EMPTY;
+import static 
org.apache.pulsar.compaction.CompactedTopicImpl.NEWER_THAN_COMPACTED;
+import static org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import javax.annotation.Nonnull;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+
+
+public class PulsarTopicCompactionService implements TopicCompactionService {
+
+    private final String topic;
+
+    private final CompactedTopicImpl compactedTopic;
+
+    private final Supplier<Compactor> compactorSupplier;
+
+    public PulsarTopicCompactionService(String topic, BookKeeper bookKeeper,
+                                        Supplier<Compactor> compactorSupplier) 
{
+        this.topic = topic;
+        this.compactedTopic = new CompactedTopicImpl(bookKeeper);
+        this.compactorSupplier = compactorSupplier;
+    }
+
+    @Override
+    public CompletableFuture<Void> compact() {
+        Compactor compactor;
+        try {
+            compactor = compactorSupplier.get();
+        } catch (Throwable e) {
+            return CompletableFuture.failedFuture(e);
+        }
+        return compactor.compact(topic).thenApply(x -> null);
+    }
+
+    @Override
+    public CompletableFuture<List<Entry>> readCompactedEntries(@Nonnull 
Position startPosition,
+                                                               int 
numberOfEntriesToRead) {
+        Objects.requireNonNull(startPosition);
+        checkArgument(numberOfEntriesToRead > 0);
+
+        CompletableFuture<List<Entry>> resultFuture = new 
CompletableFuture<>();
+
+        
Objects.requireNonNull(compactedTopic.getCompactedTopicContextFuture()).thenCompose(
+                (context) -> findStartPoint((PositionImpl) startPosition, 
context.ledger.getLastAddConfirmed(),
+                        context.cache).thenCompose((startPoint) -> {
+                    if (startPoint == COMPACT_LEDGER_EMPTY || startPoint == 
NEWER_THAN_COMPACTED) {
+                        return 
CompletableFuture.completedFuture(Collections.emptyList());
+                    }
+                    long endPoint =
+                            Math.min(context.ledger.getLastAddConfirmed(), 
startPoint + numberOfEntriesToRead);
+                    return CompactedTopicImpl.readEntries(context.ledger, 
startPoint, endPoint);
+                })).whenComplete((result, ex) -> {
+                    if (ex == null) {
+                        resultFuture.complete(result);
+                    } else {
+                        ex = FutureUtil.unwrapCompletionException(ex);
+                        if (ex instanceof NoSuchElementException) {
+                            resultFuture.complete(Collections.emptyList());
+                        } else {
+                            resultFuture.completeExceptionally(ex);
+                        }
+                    }
+                });
+
+        return resultFuture;
+    }
+
+    @Override
+    public CompletableFuture<Entry> readLastCompactedEntry() {
+        return compactedTopic.readLastEntryOfCompactedLedger();
+    }
+
+    @Override
+    public CompletableFuture<Position> getLastCompactedPosition() {
+        return 
CompletableFuture.completedFuture(compactedTopic.getCompactionHorizon().orElse(null));
+    }
+
+    public CompactedTopicImpl getCompactedTopic() {
+        return compactedTopic;
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
new file mode 100644
index 00000000000..6b64b9ce0fd
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import com.google.common.annotations.Beta;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import javax.annotation.Nonnull;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+
+@Beta
[email protected]
+public interface TopicCompactionService {
+    /**
+     * Compact the topic.
+     * Topic Compaction is a key-based retention mechanism. It keeps the most 
recent value for a given key and
+     * user reads compacted data from TopicCompactionService.
+     *
+     * @return a future that will be completed when the compaction is done.
+     */
+    CompletableFuture<Void> compact();
+
+    /**
+     * Read the compacted entries from the TopicCompactionService.
+     *
+     * @param startPosition         the position to start reading from.
+     * @param numberOfEntriesToRead the maximum number of entries to read.
+     * @return a future that will be completed with the list of entries, this 
list can be null.
+     */
+    CompletableFuture<List<Entry>> readCompactedEntries(@Nonnull Position 
startPosition, int numberOfEntriesToRead);
+
+    /**
+     * Read the last compacted entry from the TopicCompactionService.
+     *
+     * @return a future that will be completed with the compacted last entry, 
this entry can be null.
+     */
+    CompletableFuture<Entry> readLastCompactedEntry();
+
+    /**
+     * Get the last compacted position from the TopicCompactionService.
+     *
+     * @return a future that will be completed with the last compacted 
position, this position can be null.
+     */
+    CompletableFuture<Position> getLastCompactedPosition();
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 45ef58bb703..fefed1aaa0a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -93,7 +93,7 @@ import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
+import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription;
 import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
@@ -1792,7 +1792,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         CompactedTopic compactedTopic = mock(CompactedTopic.class);
         when(compactedTopic.newCompactedLedger(any(Position.class), anyLong()))
                 
.thenReturn(CompletableFuture.completedFuture(mock(CompactedTopicContext.class)));
-        PersistentSubscription sub = new CompactorSubscription(topic, 
compactedTopic,
+        PersistentSubscription sub = new PulsarCompactorSubscription(topic, 
compactedTopic,
                 Compactor.COMPACTION_SUBSCRIPTION,
                 cursorMock);
         PositionImpl position = new PositionImpl(1, 1);
@@ -1816,7 +1816,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         CompactedTopic compactedTopic = mock(CompactedTopic.class);
         when(compactedTopic.newCompactedLedger(any(Position.class), anyLong()))
                 .thenReturn(CompletableFuture.completedFuture(null));
-        new CompactorSubscription(topic, compactedTopic, 
Compactor.COMPACTION_SUBSCRIPTION, cursorMock);
+        new PulsarCompactorSubscription(topic, compactedTopic, 
Compactor.COMPACTION_SUBSCRIPTION, cursorMock);
         verify(compactedTopic, Mockito.times(1)).newCompactedLedger(position, 
ledgerId);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
new file mode 100644
index 00000000000..5810e0180d0
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import static 
org.apache.pulsar.compaction.Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY;
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
+import static org.testng.Assert.assertEquals;
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.testng.annotations.Test;
+
+public class TopicCompactionServiceTest extends CompactorTest {
+
+    @Test
+    public void test() throws PulsarClientException, PulsarAdminException {
+        admin.clusters().createCluster("test", 
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", 
"role2"), Set.of("test"));
+        String defaultTenant = "prop-xyz";
+        admin.tenants().createTenant(defaultTenant, tenantInfo);
+        String defaultNamespace = defaultTenant + "/ns1";
+        admin.namespaces().createNamespace(defaultNamespace, Set.of("test"));
+
+        String topic = "persistent://prop-xyz/ns1/my-topic";
+
+        PulsarTopicCompactionService service = new 
PulsarTopicCompactionService(topic, bk, () -> compactor);
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+
+        producer.newMessage()
+                .key("a")
+                .value("A_1".getBytes())
+                .send();
+        producer.newMessage()
+                .key("b")
+                .value("B_1".getBytes())
+                .send();
+        producer.newMessage()
+                .key("a")
+                .value("A_2".getBytes())
+                .send();
+        producer.newMessage()
+                .key("b")
+                .value("B_2".getBytes())
+                .send();
+        producer.newMessage()
+                .key("b")
+                .value("B_3".getBytes())
+                .send();
+
+        producer.flush();
+
+        service.compact().join();
+
+
+        CompactedTopicImpl compactedTopic = service.getCompactedTopic();
+
+        Long compactedLedger = 
admin.topics().getInternalStats(topic).cursors.get(COMPACTION_SUBSCRIPTION).properties.get(
+                COMPACTED_TOPIC_LEDGER_PROPERTY);
+        String markDeletePosition =
+                
admin.topics().getInternalStats(topic).cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition;
+        String[] split = markDeletePosition.split(":");
+        
compactedTopic.newCompactedLedger(PositionImpl.get(Long.valueOf(split[0]), 
Long.valueOf(split[1])),
+                compactedLedger).join();
+
+        Position lastCompactedPosition = 
service.getLastCompactedPosition().join();
+        
assertEquals(admin.topics().getInternalStats(topic).lastConfirmedEntry, 
lastCompactedPosition.toString());
+
+        List<Entry> entries = 
service.readCompactedEntries(PositionImpl.EARLIEST, 4).join();
+        assertEquals(entries.size(), 2);
+        entries.stream().map(e -> {
+            try {
+                return MessageImpl.deserialize(e.getDataBuffer());
+            } catch (IOException ex) {
+                throw new RuntimeException(ex);
+            }
+        }).forEach(message -> {
+            String data = new String(message.getData());
+            if (Objects.equals(message.getKey(), "a")) {
+                assertEquals(data, "A_2");
+            } else {
+                assertEquals(data, "B_3");
+            }
+        });
+    }
+}

Reply via email to