This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c82825bb9b8 [feat][broker][PIP-278] Support pluggable topic compaction
service - part1 (#20645)
c82825bb9b8 is described below
commit c82825bb9b8ef98e2b120bcdd65a7d5ddb134549
Author: Cong Zhao <[email protected]>
AuthorDate: Fri Jun 30 16:35:47 2023 +0800
[feat][broker][PIP-278] Support pluggable topic compaction service - part1
(#20645)
---
.../apache/pulsar/broker/ServiceConfiguration.java | 6 ++
.../org/apache/pulsar/broker/PulsarService.java | 2 +-
.../broker/service/AbstractBaseDispatcher.java | 4 +-
.../broker/service/persistent/PersistentTopic.java | 2 +-
...ption.java => PulsarCompactorSubscription.java} | 8 +-
.../pulsar/compaction/CompactedTopicImpl.java | 10 +-
.../compaction/CompactionServiceFactory.java | 48 +++++++++
.../compaction/PulsarCompactionServiceFactory.java | 84 +++++++++++++++
.../compaction/PulsarTopicCompactionService.java | 111 +++++++++++++++++++
.../pulsar/compaction/TopicCompactionService.java | 63 +++++++++++
.../pulsar/broker/service/PersistentTopicTest.java | 6 +-
.../compaction/TopicCompactionServiceTest.java | 118 +++++++++++++++++++++
12 files changed, 449 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 9398d349be7..b41a562fbd7 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3166,6 +3166,12 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private int transactionPendingAckBatchedWriteMaxDelayInMillis = 1;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "The class name of the factory that implements the topic
compaction service."
+ )
+ private String compactionServiceFactoryClassName =
"org.apache.pulsar.compaction.PulsarCompactionServiceFactory";
+
/**** --- KeyStore TLS config variables. --- ****/
@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 00dfad40191..2b64a8cfb5c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1470,7 +1470,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
return bkClientFactory;
}
- protected synchronized ScheduledExecutorService getCompactorExecutor() {
+ public synchronized ScheduledExecutorService getCompactorExecutor() {
if (this.compactorExecutor == null) {
compactorExecutor = Executors.newSingleThreadScheduledExecutor(
new ExecutorProvider.ExtendedThreadFactory("compaction"));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 8f6caa7a208..437a6527e85 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -37,10 +37,10 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
-import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import
org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.transaction.TxnID;
@@ -301,7 +301,7 @@ public abstract class AbstractBaseDispatcher extends
EntryFilterSupport implemen
}
private void individualAcknowledgeMessageIfNeeded(Position position,
Map<String, Long> properties) {
- if (!(subscription instanceof CompactorSubscription)) {
+ if (!(subscription instanceof PulsarCompactorSubscription)) {
subscription.acknowledgeMessage(Collections.singletonList(position),
AckType.Individual, properties);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2a0c229daf6..12691d1c677 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -483,7 +483,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
boolean replicated, Map<String, String> subscriptionProperties) {
Objects.requireNonNull(compactedTopic);
if (isCompactionSubscription(subscriptionName)) {
- return new CompactorSubscription(this, compactedTopic,
subscriptionName, cursor);
+ return new PulsarCompactorSubscription(this, compactedTopic,
subscriptionName, cursor);
} else {
return new PersistentSubscription(this, subscriptionName, cursor,
replicated, subscriptionProperties);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java
similarity index 93%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java
index ec34aeffbec..dbb09f6ac39 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java
@@ -32,11 +32,11 @@ import org.apache.pulsar.compaction.Compactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class CompactorSubscription extends PersistentSubscription {
+public class PulsarCompactorSubscription extends PersistentSubscription {
private final CompactedTopic compactedTopic;
- public CompactorSubscription(PersistentTopic topic, CompactedTopic
compactedTopic,
- String subscriptionName, ManagedCursor
cursor) {
+ public PulsarCompactorSubscription(PersistentTopic topic, CompactedTopic
compactedTopic,
+ String subscriptionName, ManagedCursor
cursor) {
super(topic, subscriptionName, cursor, false);
checkArgument(subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION));
this.compactedTopic = compactedTopic;
@@ -106,5 +106,5 @@ public class CompactorSubscription extends
PersistentSubscription {
}
}
- private static final Logger log =
LoggerFactory.getLogger(CompactorSubscription.class);
+ private static final Logger log =
LoggerFactory.getLogger(PulsarCompactorSubscription.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index c8114f9adb6..e2d3de9c19a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -32,6 +32,7 @@ import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
@@ -60,7 +61,7 @@ public class CompactedTopicImpl implements CompactedTopic {
private final BookKeeper bk;
private PositionImpl compactionHorizon = null;
- private CompletableFuture<CompactedTopicContext> compactedTopicContext =
null;
+ private volatile CompletableFuture<CompactedTopicContext>
compactedTopicContext = null;
public CompactedTopicImpl(BookKeeper bk) {
this.bk = bk;
@@ -258,7 +259,7 @@ public class CompactedTopicImpl implements CompactedTopic {
return promise;
}
- private static CompletableFuture<List<Entry>> readEntries(LedgerHandle lh,
long from, long to) {
+ static CompletableFuture<List<Entry>> readEntries(LedgerHandle lh, long
from, long to) {
CompletableFuture<Enumeration<LedgerEntry>> promise = new
CompletableFuture<>();
lh.asyncReadEntries(from, to,
@@ -320,6 +321,11 @@ public class CompactedTopicImpl implements CompactedTopic {
public synchronized Optional<Position> getCompactionHorizon() {
return Optional.ofNullable(this.compactionHorizon);
}
+
+ @Nullable
+ public CompletableFuture<CompactedTopicContext>
getCompactedTopicContextFuture() {
+ return compactedTopicContext;
+ }
private static final Logger log =
LoggerFactory.getLogger(CompactedTopicImpl.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionServiceFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionServiceFactory.java
new file mode 100644
index 00000000000..de1abfbea95
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionServiceFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import com.google.common.annotations.Beta;
+import java.util.concurrent.CompletableFuture;
+import javax.annotation.Nonnull;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+
+@Beta
[email protected]
+public interface CompactionServiceFactory extends AutoCloseable {
+
+ /**
+ * Initialize the compaction service factory.
+ *
+ * @param pulsarService
+ * the pulsar service instance
+ * @return a future represents the initialization result
+ */
+ CompletableFuture<Void> initialize(@Nonnull PulsarService pulsarService);
+
+ /**
+ * Create a new topic compaction service for topic.
+ *
+ * @param topic
+ * the topic name
+ * @return a future represents the topic compaction service
+ */
+ CompletableFuture<TopicCompactionService>
newTopicCompactionService(@Nonnull String topic);
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
new file mode 100644
index 00000000000..dd817ca35f1
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+
+public class PulsarCompactionServiceFactory implements
CompactionServiceFactory {
+
+ private PulsarService pulsarService;
+
+ private volatile Compactor compactor;
+
+ @VisibleForTesting
+ public Compactor getCompactor() throws PulsarServerException {
+ if (compactor == null) {
+ synchronized (this) {
+ if (compactor == null) {
+ compactor = newCompactor();
+ }
+ }
+ }
+ return compactor;
+ }
+
+ @Nullable
+ public Compactor getNullableCompactor() {
+ return compactor;
+ }
+
+ protected Compactor newCompactor() throws PulsarServerException {
+ return new TwoPhaseCompactor(pulsarService.getConfiguration(),
+ pulsarService.getClient(), pulsarService.getBookKeeperClient(),
+ pulsarService.getCompactorExecutor());
+ }
+
+ @Override
+ public CompletableFuture<Void> initialize(@Nonnull PulsarService
pulsarService) {
+ Objects.requireNonNull(pulsarService);
+ this.pulsarService = pulsarService;
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<TopicCompactionService>
newTopicCompactionService(@Nonnull String topic) {
+ Objects.requireNonNull(topic);
+ PulsarTopicCompactionService pulsarTopicCompactionService =
+ new PulsarTopicCompactionService(topic,
pulsarService.getBookKeeperClient(), () -> {
+ try {
+ return this.getCompactor();
+ } catch (Throwable e) {
+ throw new CompletionException(e);
+ }
+ });
+ return CompletableFuture.completedFuture(pulsarTopicCompactionService);
+ }
+
+ @Override
+ public void close() throws Exception {
+ // noop
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
new file mode 100644
index 00000000000..0a8bf9d69a2
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.pulsar.compaction.CompactedTopicImpl.COMPACT_LEDGER_EMPTY;
+import static
org.apache.pulsar.compaction.CompactedTopicImpl.NEWER_THAN_COMPACTED;
+import static org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import javax.annotation.Nonnull;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+
+
+public class PulsarTopicCompactionService implements TopicCompactionService {
+
+ private final String topic;
+
+ private final CompactedTopicImpl compactedTopic;
+
+ private final Supplier<Compactor> compactorSupplier;
+
+ public PulsarTopicCompactionService(String topic, BookKeeper bookKeeper,
+ Supplier<Compactor> compactorSupplier)
{
+ this.topic = topic;
+ this.compactedTopic = new CompactedTopicImpl(bookKeeper);
+ this.compactorSupplier = compactorSupplier;
+ }
+
+ @Override
+ public CompletableFuture<Void> compact() {
+ Compactor compactor;
+ try {
+ compactor = compactorSupplier.get();
+ } catch (Throwable e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ return compactor.compact(topic).thenApply(x -> null);
+ }
+
+ @Override
+ public CompletableFuture<List<Entry>> readCompactedEntries(@Nonnull
Position startPosition,
+ int
numberOfEntriesToRead) {
+ Objects.requireNonNull(startPosition);
+ checkArgument(numberOfEntriesToRead > 0);
+
+ CompletableFuture<List<Entry>> resultFuture = new
CompletableFuture<>();
+
+
Objects.requireNonNull(compactedTopic.getCompactedTopicContextFuture()).thenCompose(
+ (context) -> findStartPoint((PositionImpl) startPosition,
context.ledger.getLastAddConfirmed(),
+ context.cache).thenCompose((startPoint) -> {
+ if (startPoint == COMPACT_LEDGER_EMPTY || startPoint ==
NEWER_THAN_COMPACTED) {
+ return
CompletableFuture.completedFuture(Collections.emptyList());
+ }
+ long endPoint =
+ Math.min(context.ledger.getLastAddConfirmed(),
startPoint + numberOfEntriesToRead);
+ return CompactedTopicImpl.readEntries(context.ledger,
startPoint, endPoint);
+ })).whenComplete((result, ex) -> {
+ if (ex == null) {
+ resultFuture.complete(result);
+ } else {
+ ex = FutureUtil.unwrapCompletionException(ex);
+ if (ex instanceof NoSuchElementException) {
+ resultFuture.complete(Collections.emptyList());
+ } else {
+ resultFuture.completeExceptionally(ex);
+ }
+ }
+ });
+
+ return resultFuture;
+ }
+
+ @Override
+ public CompletableFuture<Entry> readLastCompactedEntry() {
+ return compactedTopic.readLastEntryOfCompactedLedger();
+ }
+
+ @Override
+ public CompletableFuture<Position> getLastCompactedPosition() {
+ return
CompletableFuture.completedFuture(compactedTopic.getCompactionHorizon().orElse(null));
+ }
+
+ public CompactedTopicImpl getCompactedTopic() {
+ return compactedTopic;
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
new file mode 100644
index 00000000000..6b64b9ce0fd
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import com.google.common.annotations.Beta;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import javax.annotation.Nonnull;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+
+@Beta
[email protected]
+public interface TopicCompactionService {
+ /**
+ * Compact the topic.
+ * Topic Compaction is a key-based retention mechanism. It keeps the most
recent value for a given key and
+ * user reads compacted data from TopicCompactionService.
+ *
+ * @return a future that will be completed when the compaction is done.
+ */
+ CompletableFuture<Void> compact();
+
+ /**
+ * Read the compacted entries from the TopicCompactionService.
+ *
+ * @param startPosition the position to start reading from.
+ * @param numberOfEntriesToRead the maximum number of entries to read.
+ * @return a future that will be completed with the list of entries, this
list can be null.
+ */
+ CompletableFuture<List<Entry>> readCompactedEntries(@Nonnull Position
startPosition, int numberOfEntriesToRead);
+
+ /**
+ * Read the last compacted entry from the TopicCompactionService.
+ *
+ * @return a future that will be completed with the compacted last entry,
this entry can be null.
+ */
+ CompletableFuture<Entry> readLastCompactedEntry();
+
+ /**
+ * Get the last compacted position from the TopicCompactionService.
+ *
+ * @return a future that will be completed with the last compacted
position, this position can be null.
+ */
+ CompletableFuture<Position> getLastCompactedPosition();
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 45ef58bb703..fefed1aaa0a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -93,7 +93,7 @@ import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
+import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription;
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
@@ -1792,7 +1792,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
CompactedTopic compactedTopic = mock(CompactedTopic.class);
when(compactedTopic.newCompactedLedger(any(Position.class), anyLong()))
.thenReturn(CompletableFuture.completedFuture(mock(CompactedTopicContext.class)));
- PersistentSubscription sub = new CompactorSubscription(topic,
compactedTopic,
+ PersistentSubscription sub = new PulsarCompactorSubscription(topic,
compactedTopic,
Compactor.COMPACTION_SUBSCRIPTION,
cursorMock);
PositionImpl position = new PositionImpl(1, 1);
@@ -1816,7 +1816,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
CompactedTopic compactedTopic = mock(CompactedTopic.class);
when(compactedTopic.newCompactedLedger(any(Position.class), anyLong()))
.thenReturn(CompletableFuture.completedFuture(null));
- new CompactorSubscription(topic, compactedTopic,
Compactor.COMPACTION_SUBSCRIPTION, cursorMock);
+ new PulsarCompactorSubscription(topic, compactedTopic,
Compactor.COMPACTION_SUBSCRIPTION, cursorMock);
verify(compactedTopic, Mockito.times(1)).newCompactedLedger(position,
ledgerId);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
new file mode 100644
index 00000000000..5810e0180d0
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import static
org.apache.pulsar.compaction.Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY;
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
+import static org.testng.Assert.assertEquals;
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.testng.annotations.Test;
+
+public class TopicCompactionServiceTest extends CompactorTest {
+
+ @Test
+ public void test() throws PulsarClientException, PulsarAdminException {
+ admin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+ TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1",
"role2"), Set.of("test"));
+ String defaultTenant = "prop-xyz";
+ admin.tenants().createTenant(defaultTenant, tenantInfo);
+ String defaultNamespace = defaultTenant + "/ns1";
+ admin.namespaces().createNamespace(defaultNamespace, Set.of("test"));
+
+ String topic = "persistent://prop-xyz/ns1/my-topic";
+
+ PulsarTopicCompactionService service = new
PulsarTopicCompactionService(topic, bk, () -> compactor);
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ producer.newMessage()
+ .key("a")
+ .value("A_1".getBytes())
+ .send();
+ producer.newMessage()
+ .key("b")
+ .value("B_1".getBytes())
+ .send();
+ producer.newMessage()
+ .key("a")
+ .value("A_2".getBytes())
+ .send();
+ producer.newMessage()
+ .key("b")
+ .value("B_2".getBytes())
+ .send();
+ producer.newMessage()
+ .key("b")
+ .value("B_3".getBytes())
+ .send();
+
+ producer.flush();
+
+ service.compact().join();
+
+
+ CompactedTopicImpl compactedTopic = service.getCompactedTopic();
+
+ Long compactedLedger =
admin.topics().getInternalStats(topic).cursors.get(COMPACTION_SUBSCRIPTION).properties.get(
+ COMPACTED_TOPIC_LEDGER_PROPERTY);
+ String markDeletePosition =
+
admin.topics().getInternalStats(topic).cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition;
+ String[] split = markDeletePosition.split(":");
+
compactedTopic.newCompactedLedger(PositionImpl.get(Long.valueOf(split[0]),
Long.valueOf(split[1])),
+ compactedLedger).join();
+
+ Position lastCompactedPosition =
service.getLastCompactedPosition().join();
+
assertEquals(admin.topics().getInternalStats(topic).lastConfirmedEntry,
lastCompactedPosition.toString());
+
+ List<Entry> entries =
service.readCompactedEntries(PositionImpl.EARLIEST, 4).join();
+ assertEquals(entries.size(), 2);
+ entries.stream().map(e -> {
+ try {
+ return MessageImpl.deserialize(e.getDataBuffer());
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }).forEach(message -> {
+ String data = new String(message.getData());
+ if (Objects.equals(message.getKey(), "a")) {
+ assertEquals(data, "A_2");
+ } else {
+ assertEquals(data, "B_3");
+ }
+ });
+ }
+}