This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1c495e190b3 [feat] PIP-352: Event time based compaction (#22517)
1c495e190b3 is described below
commit 1c495e190b3c569e9dfd44acef2a697c93a1f771
Author: Marek Czajkowski <[email protected]>
AuthorDate: Fri Aug 23 16:49:15 2024 +0200
[feat] PIP-352: Event time based compaction (#22517)
---
conf/broker.conf | 6 +
conf/standalone.conf | 6 +
.../pulsar/client/impl/RawBatchConverter.java | 33 +-
.../compaction/AbstractTwoPhaseCompactor.java | 439 +++++++++++++++++++
.../apache/pulsar/compaction/CompactorTool.java | 2 +-
.../EventTimeCompactionServiceFactory.java | 33 ++
.../pulsar/compaction/EventTimeOrderCompactor.java | 161 +++++++
.../pulsar/compaction/MessageCompactionData.java | 23 +
.../compaction/PublishingOrderCompactor.java | 127 ++++++
.../compaction/PulsarCompactionServiceFactory.java | 2 +-
.../compaction/StrategicTwoPhaseCompactor.java | 2 +-
.../pulsar/compaction/TwoPhaseCompactor.java | 470 ---------------------
.../pulsar/compaction/CompactionRetentionTest.java | 4 +-
.../apache/pulsar/compaction/CompactionTest.java | 8 +-
.../apache/pulsar/compaction/CompactorTest.java | 6 +-
.../compaction/EventTimeOrderCompactorTest.java | 201 +++++++++
.../pulsar/compaction/StrategicCompactionTest.java | 2 +-
.../compaction/TopicCompactionServiceTest.java | 4 +-
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 4 +-
.../org/apache/pulsar/io/PulsarSinkE2ETest.java | 4 +-
20 files changed, 1037 insertions(+), 500 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index e5d8a32e717..fc32246adea 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -563,6 +563,12 @@ brokerServiceCompactionPhaseOneLoopTimeInSeconds=30
# Whether retain null-key message during topic compaction
topicCompactionRetainNullKey=false
+# Class name of the factory that implements the topic compaction service.
+# If value is "org.apache.pulsar.compaction.EventTimeCompactionServiceFactory",
+# will create topic compaction service based on message eventTime.
+# By default compaction service is based on message publishing order.
+compactionServiceFactoryClassName=org.apache.pulsar.compaction.PulsarCompactionServiceFactory
+
# Whether to enable the delayed delivery for messages.
# If disabled, messages will be immediately delivered and there will
# be no tracking overhead.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 30b39af8869..ae696410d86 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -1318,3 +1318,9 @@ disableBrokerInterceptors=true
# Whether retain null-key message during topic compaction
topicCompactionRetainNullKey=false
+
+# Class name of the factory that implements the topic compaction service.
+# If value is "org.apache.pulsar.compaction.EventTimeCompactionServiceFactory",
+# will create topic compaction service based on message eventTime.
+# By default compaction service is based on message publishing order.
+compactionServiceFactoryClassName=org.apache.pulsar.compaction.PulsarCompactionServiceFactory
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index 4c24f6d3036..f41a7aedd59 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -38,6 +38,7 @@ import
org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.compaction.MessageCompactionData;
public class RawBatchConverter {
@@ -51,8 +52,8 @@ public class RawBatchConverter {
return metadata.hasNumMessagesInBatch() &&
metadata.getEncryptionKeysCount() == 0;
}
- public static List<ImmutableTriple<MessageId, String, Integer>>
extractIdsAndKeysAndSize(RawMessage msg)
- throws IOException {
+ public static List<MessageCompactionData>
extractMessageCompactionData(RawMessage msg)
+ throws IOException {
checkArgument(msg.getMessageIdData().getBatchIndex() == -1);
ByteBuf payload = msg.getHeadersAndPayload();
@@ -64,25 +65,35 @@ public class RawBatchConverter {
int uncompressedSize = metadata.getUncompressedSize();
ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);
- List<ImmutableTriple<MessageId, String, Integer>> idsAndKeysAndSize =
new ArrayList<>();
+ List<MessageCompactionData> messageCompactionDataList = new
ArrayList<>();
SingleMessageMetadata smm = new SingleMessageMetadata();
for (int i = 0; i < batchSize; i++) {
ByteBuf singleMessagePayload =
Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
-
smm,
-
0, batchSize);
+ smm,
+ 0, batchSize);
MessageId id = new
BatchMessageIdImpl(msg.getMessageIdData().getLedgerId(),
-
msg.getMessageIdData().getEntryId(),
-
msg.getMessageIdData().getPartition(),
- i);
+ msg.getMessageIdData().getEntryId(),
+ msg.getMessageIdData().getPartition(),
+ i);
if (!smm.isCompactedOut()) {
- idsAndKeysAndSize.add(ImmutableTriple.of(id,
- smm.hasPartitionKey() ? smm.getPartitionKey() :
null,
- smm.hasPayloadSize() ? smm.getPayloadSize() : 0));
+ messageCompactionDataList.add(new MessageCompactionData(id,
+ smm.hasPartitionKey() ? smm.getPartitionKey() : null,
+ smm.hasPayloadSize() ? smm.getPayloadSize() : 0,
smm.getEventTime()));
}
singleMessagePayload.release();
}
uncompressedPayload.release();
+ return messageCompactionDataList;
+ }
+
+ public static List<ImmutableTriple<MessageId, String, Integer>>
extractIdsAndKeysAndSize(
+ RawMessage msg)
+ throws IOException {
+ List<ImmutableTriple<MessageId, String, Integer>> idsAndKeysAndSize =
new ArrayList<>();
+ for (MessageCompactionData mcd : extractMessageCompactionData(msg)) {
+ idsAndKeysAndSize.add(ImmutableTriple.of(mcd.messageId(),
mcd.key(), mcd.payloadSize()));
+ }
return idsAndKeysAndSize;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
new file mode 100644
index 00000000000..5b03f270251
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.api.RawReader;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.RawBatchConverter;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Compaction will go through the topic in two passes. The first pass
+ * selects latest offset for each key in the topic. Then the second pass
+ * writes these values to a ledger.
+ *
+ * <p>The two passes are required to avoid holding the payloads of each of
+ * the latest values in memory, as the payload can be many orders of
+ * magnitude larger than a message id.
+ */
+public abstract class AbstractTwoPhaseCompactor<T> extends Compactor {
+
+ private static final Logger log =
LoggerFactory.getLogger(AbstractTwoPhaseCompactor.class);
+ protected static final int MAX_OUTSTANDING = 500;
+ protected final Duration phaseOneLoopReadTimeout;
+ protected final boolean topicCompactionRetainNullKey;
+
+ public AbstractTwoPhaseCompactor(ServiceConfiguration conf,
+ PulsarClient pulsar,
+ BookKeeper bk,
+ ScheduledExecutorService scheduler) {
+ super(conf, pulsar, bk, scheduler);
+ phaseOneLoopReadTimeout = Duration.ofSeconds(
+ conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
+ topicCompactionRetainNullKey = conf.isTopicCompactionRetainNullKey();
+ }
+
+ protected abstract Map<String, MessageId>
toLatestMessageIdForKey(Map<String, T> latestForKey);
+
+ protected abstract boolean compactMessage(String topic, Map<String, T>
latestForKey,
+ RawMessage m, MessageId id);
+
+
+ protected abstract boolean compactBatchMessage(String topic, Map<String, T>
latestForKey,
+ RawMessage m,
+ MessageMetadata metadata, MessageId id);
+
+ @Override
+ protected CompletableFuture<Long> doCompaction(RawReader reader, BookKeeper
bk) {
+ return reader.hasMessageAvailableAsync()
+ .thenCompose(available -> {
+ if (available) {
+ return phaseOne(reader).thenCompose(
+ (r) -> phaseTwo(reader, r.from, r.to, r.lastReadId,
toLatestMessageIdForKey(r.latestForKey), bk));
+ } else {
+ log.info("Skip compaction of the empty topic {}",
reader.getTopic());
+ return CompletableFuture.completedFuture(-1L);
+ }
+ });
+ }
+
+ private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) {
+ Map<String, T> latestForKey = new HashMap<>();
+ CompletableFuture<PhaseOneResult> loopPromise = new CompletableFuture<>();
+
+ reader.getLastMessageIdAsync()
+ .thenAccept(lastMessageId -> {
+ log.info("Commencing phase one of compaction for {}, reading to {}",
+ reader.getTopic(), lastMessageId);
+ // Each entry is processed as a whole, discard the batchIndex part
deliberately.
+ MessageIdImpl lastImpl = (MessageIdImpl) lastMessageId;
+ MessageIdImpl lastEntryMessageId = new
MessageIdImpl(lastImpl.getLedgerId(),
+ lastImpl.getEntryId(),
+ lastImpl.getPartitionIndex());
+ phaseOneLoop(reader, Optional.empty(), Optional.empty(),
lastEntryMessageId, latestForKey,
+ loopPromise);
+ }).exceptionally(ex -> {
+ loopPromise.completeExceptionally(ex);
+ return null;
+ });
+
+ return loopPromise;
+ }
+
+ private void phaseOneLoop(RawReader reader,
+ Optional<MessageId> firstMessageId,
+ Optional<MessageId> toMessageId,
+ MessageId lastMessageId,
+ Map<String, T> latestForKey,
+ CompletableFuture<PhaseOneResult> loopPromise) {
+ if (loopPromise.isDone()) {
+ return;
+ }
+ CompletableFuture<RawMessage> future = reader.readNextAsync();
+ FutureUtil.addTimeoutHandling(future,
+ phaseOneLoopReadTimeout, scheduler,
+ () -> FutureUtil.createTimeoutException("Timeout", getClass(),
"phaseOneLoop(...)"));
+
+ future.thenAcceptAsync(m -> {
+ try (m) {
+ MessageId id = m.getMessageId();
+ boolean deletedMessage = false;
+ mxBean.addCompactionReadOp(reader.getTopic(),
m.getHeadersAndPayload().readableBytes());
+ MessageMetadata metadata =
Commands.parseMessageMetadata(m.getHeadersAndPayload());
+ if (Markers.isServerOnlyMarker(metadata)) {
+ mxBean.addCompactionRemovedEvent(reader.getTopic());
+ deletedMessage = true;
+ } else if (RawBatchConverter.isReadableBatch(metadata)) {
+ deletedMessage = compactBatchMessage(reader.getTopic(),
latestForKey, m, metadata, id);
+ } else {
+ deletedMessage = compactMessage(reader.getTopic(), latestForKey, m,
id);
+ }
+ MessageId first = firstMessageId.orElse(deletedMessage ? null : id);
+ MessageId to = deletedMessage ? toMessageId.orElse(null) : id;
+ if (id.compareTo(lastMessageId) == 0) {
+ loopPromise.complete(new PhaseOneResult(first == null ? id : first,
to == null ? id : to,
+ lastMessageId, latestForKey));
+ } else {
+ phaseOneLoop(reader,
+ Optional.ofNullable(first),
+ Optional.ofNullable(to),
+ lastMessageId,
+ latestForKey, loopPromise);
+ }
+ }
+ }, scheduler).exceptionally(ex -> {
+ loopPromise.completeExceptionally(ex);
+ return null;
+ });
+ }
+
+ private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from,
MessageId to,
+ MessageId lastReadId,
+ Map<String, MessageId> latestForKey, BookKeeper bk) {
+ Map<String, byte[]> metadata =
+ LedgerMetadataUtils.buildMetadataForCompactedLedger(reader.getTopic(),
to.toByteArray());
+ return createLedger(bk, metadata).thenCompose((ledger) -> {
+ log.info(
+ "Commencing phase two of compaction for {}, from {} to {},
compacting {} keys to ledger {}",
+ reader.getTopic(), from, to, latestForKey.size(), ledger.getId());
+ return phaseTwoSeekThenLoop(reader, from, to, lastReadId, latestForKey,
bk, ledger);
+ });
+ }
+
+ private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader,
MessageId from,
+ MessageId to,
+ MessageId lastReadId, Map<String, MessageId> latestForKey, BookKeeper bk,
+ LedgerHandle ledger) {
+ CompletableFuture<Long> promise = new CompletableFuture<>();
+
+ reader.seekAsync(from).thenCompose((v) -> {
+ Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
+ CompletableFuture<Void> loopPromise = new CompletableFuture<>();
+ phaseTwoLoop(reader, to, latestForKey, ledger, outstanding,
loopPromise, MessageId.earliest);
+ return loopPromise;
+ }).thenCompose((v) -> closeLedger(ledger))
+ .thenCompose((v) -> reader.acknowledgeCumulativeAsync(lastReadId,
+ Map.of(COMPACTED_TOPIC_LEDGER_PROPERTY, ledger.getId())))
+ .whenComplete((res, exception) -> {
+ if (exception != null) {
+ deleteLedger(bk, ledger).whenComplete((res2, exception2) -> {
+ if (exception2 != null) {
+ log.warn("Cleanup of ledger {} for failed", ledger,
exception2);
+ }
+ // complete with original exception
+ promise.completeExceptionally(exception);
+ });
+ } else {
+ promise.complete(ledger.getId());
+ }
+ });
+ return promise;
+ }
+
+ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String,
MessageId> latestForKey,
+ LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise,
+ MessageId lastCompactedMessageId) {
+ if (promise.isDone()) {
+ return;
+ }
+ reader.readNextAsync().thenAcceptAsync(m -> {
+ if (promise.isDone()) {
+ m.close();
+ return;
+ }
+
+ if (m.getMessageId().compareTo(lastCompactedMessageId) <= 0) {
+ m.close();
+ phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise,
lastCompactedMessageId);
+ return;
+ }
+
+ try {
+ MessageId id = m.getMessageId();
+ Optional<RawMessage> messageToAdd = Optional.empty();
+ mxBean.addCompactionReadOp(reader.getTopic(),
m.getHeadersAndPayload().readableBytes());
+ MessageMetadata metadata =
Commands.parseMessageMetadata(m.getHeadersAndPayload());
+ if (Markers.isServerOnlyMarker(metadata)) {
+ messageToAdd = Optional.empty();
+ } else if (RawBatchConverter.isReadableBatch(metadata)) {
+ try {
+ messageToAdd = rebatchMessage(reader.getTopic(),
+ m, (key, subid) -> subid.equals(latestForKey.get(key)),
+ topicCompactionRetainNullKey);
+ } catch (IOException ioe) {
+ log.info("Error decoding batch for message {}. Whole batch will be
included in output",
+ id, ioe);
+ messageToAdd = Optional.of(m);
+ }
+ } else {
+ Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
+ MessageId msg;
+ if (keyAndSize == null) {
+ messageToAdd = topicCompactionRetainNullKey ? Optional.of(m) :
Optional.empty();
+ } else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
+ && msg.equals(id)) { // consider message only if present into
latestForKey map
+ if (keyAndSize.getRight() <= 0) {
+ promise.completeExceptionally(new IllegalArgumentException(
+ "Compaction phase found empty record from sorted key-map"));
+ }
+ messageToAdd = Optional.of(m);
+ }
+ }
+
+ if (messageToAdd.isPresent()) {
+ RawMessage message = messageToAdd.get();
+ try {
+ outstanding.acquire();
+ CompletableFuture<Void> addFuture = addToCompactedLedger(lh,
message, reader.getTopic())
+ .whenComplete((res, exception2) -> {
+ outstanding.release();
+ if (exception2 != null) {
+ promise.completeExceptionally(exception2);
+ }
+ });
+ if (to.equals(id)) {
+ // make sure all inflight writes have finished
+ outstanding.acquire(MAX_OUTSTANDING);
+ addFuture.whenComplete((res, exception2) -> {
+ if (exception2 == null) {
+ promise.complete(null);
+ }
+ });
+ return;
+ }
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ promise.completeExceptionally(ie);
+ } finally {
+ if (message != m) {
+ message.close();
+ }
+ }
+ } else if (to.equals(id)) {
+ // Reached to last-id and phase-one found it deleted-message while
iterating on ledger so,
+ // not present under latestForKey. Complete the compaction.
+ try {
+ // make sure all inflight writes have finished
+ outstanding.acquire(MAX_OUTSTANDING);
+ promise.complete(null);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ promise.completeExceptionally(e);
+ }
+ return;
+ }
+ phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise,
m.getMessageId());
+ } finally {
+ m.close();
+ }
+ }, scheduler).exceptionally(ex -> {
+ promise.completeExceptionally(ex);
+ return null;
+ });
+ }
+
+ protected CompletableFuture<LedgerHandle> createLedger(BookKeeper bk,
+ Map<String, byte[]> metadata) {
+ CompletableFuture<LedgerHandle> bkf = new CompletableFuture<>();
+
+ try {
+ bk.asyncCreateLedger(conf.getManagedLedgerDefaultEnsembleSize(),
+ conf.getManagedLedgerDefaultWriteQuorum(),
+ conf.getManagedLedgerDefaultAckQuorum(),
+ Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+ Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD,
+ (rc, ledger, ctx) -> {
+ if (rc != BKException.Code.OK) {
+ bkf.completeExceptionally(BKException.create(rc));
+ } else {
+ bkf.complete(ledger);
+ }
+ }, null, metadata);
+ } catch (Throwable t) {
+ log.error("Encountered unexpected error when creating compaction
ledger", t);
+ return FutureUtil.failedFuture(t);
+ }
+ return bkf;
+ }
+
+ protected CompletableFuture<Void> deleteLedger(BookKeeper bk, LedgerHandle
lh) {
+ CompletableFuture<Void> bkf = new CompletableFuture<>();
+ try {
+ bk.asyncDeleteLedger(lh.getId(),
+ (rc, ctx) -> {
+ if (rc != BKException.Code.OK) {
+ bkf.completeExceptionally(BKException.create(rc));
+ } else {
+ bkf.complete(null);
+ }
+ }, null);
+ } catch (Throwable t) {
+ return FutureUtil.failedFuture(t);
+ }
+ return bkf;
+ }
+
+ protected CompletableFuture<Void> closeLedger(LedgerHandle lh) {
+ CompletableFuture<Void> bkf = new CompletableFuture<>();
+ try {
+ lh.asyncClose((rc, ledger, ctx) -> {
+ if (rc != BKException.Code.OK) {
+ bkf.completeExceptionally(BKException.create(rc));
+ } else {
+ bkf.complete(null);
+ }
+ }, null);
+ } catch (Throwable t) {
+ return FutureUtil.failedFuture(t);
+ }
+ return bkf;
+ }
+
+ private CompletableFuture<Void> addToCompactedLedger(LedgerHandle lh,
RawMessage m,
+ String topic) {
+ CompletableFuture<Void> bkf = new CompletableFuture<>();
+ ByteBuf serialized = m.serialize();
+ try {
+ mxBean.addCompactionWriteOp(topic,
m.getHeadersAndPayload().readableBytes());
+ long start = System.nanoTime();
+ lh.asyncAddEntry(serialized,
+ (rc, ledger, eid, ctx) -> {
+ mxBean.addCompactionLatencyOp(topic, System.nanoTime() - start,
TimeUnit.NANOSECONDS);
+ if (rc != BKException.Code.OK) {
+ bkf.completeExceptionally(BKException.create(rc));
+ } else {
+ bkf.complete(null);
+ }
+ }, null);
+ } catch (Throwable t) {
+ return FutureUtil.failedFuture(t);
+ }
+ return bkf;
+ }
+
+ protected Pair<String, Integer> extractKeyAndSize(RawMessage m) {
+ ByteBuf headersAndPayload = m.getHeadersAndPayload();
+ MessageMetadata msgMetadata =
Commands.parseMessageMetadata(headersAndPayload);
+ if (msgMetadata.hasPartitionKey()) {
+ int size = headersAndPayload.readableBytes();
+ if (msgMetadata.hasUncompressedSize()) {
+ size = msgMetadata.getUncompressedSize();
+ }
+ return Pair.of(msgMetadata.getPartitionKey(), size);
+ } else {
+ return null;
+ }
+ }
+
+
+ protected Optional<RawMessage> rebatchMessage(String topic, RawMessage msg,
+ BiPredicate<String, MessageId> filter,
+ boolean retainNullKey)
+ throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("Rebatching message {} for topic {}", msg.getMessageId(),
topic);
+ }
+ return RawBatchConverter.rebatchMessage(msg, filter, retainNullKey);
+ }
+
+ protected static class PhaseOneResult<T> {
+
+ final MessageId from;
+ final MessageId to; // last undeleted messageId
+ final MessageId lastReadId; // last read messageId
+ final Map<String, T> latestForKey;
+
+ PhaseOneResult(MessageId from, MessageId to, MessageId lastReadId,
+ Map<String, T> latestForKey) {
+ this.from = from;
+ this.to = to;
+ this.lastReadId = lastReadId;
+ this.latestForKey = latestForKey;
+ }
+ }
+
+ public long getPhaseOneLoopReadTimeoutInSeconds() {
+ return phaseOneLoopReadTimeout.getSeconds();
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index fe77db33692..ba68e07cf5b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -172,7 +172,7 @@ public class CompactorTool {
@Cleanup
PulsarClient pulsar = createClient(brokerConfig);
- Compactor compactor = new TwoPhaseCompactor(brokerConfig, pulsar, bk,
scheduler);
+ Compactor compactor = new PublishingOrderCompactor(brokerConfig,
pulsar, bk, scheduler);
long ledgerId = compactor.compact(arguments.topic).get();
log.info("Compaction of topic {} complete. Compacted to ledger {}",
arguments.topic, ledgerId);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeCompactionServiceFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeCompactionServiceFactory.java
new file mode 100644
index 00000000000..383c7b1aeed
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeCompactionServiceFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+
+public class EventTimeCompactionServiceFactory extends
PulsarCompactionServiceFactory {
+
+ @Override
+ protected Compactor newCompactor() throws PulsarServerException {
+ PulsarService pulsarService = getPulsarService();
+ return new EventTimeOrderCompactor(pulsarService.getConfiguration(),
+ pulsarService.getClient(), pulsarService.getBookKeeperClient(),
+ pulsarService.getCompactorExecutor());
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java
new file mode 100644
index 00000000000..2cd19ba15d6
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.RawBatchConverter;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventTimeOrderCompactor extends
AbstractTwoPhaseCompactor<Pair<MessageId, Long>> {
+
+ private static final Logger log =
LoggerFactory.getLogger(EventTimeOrderCompactor.class);
+
+ public EventTimeOrderCompactor(ServiceConfiguration conf,
+ PulsarClient pulsar,
+ BookKeeper bk,
+ ScheduledExecutorService scheduler) {
+ super(conf, pulsar, bk, scheduler);
+ }
+
+ @Override
+ protected Map<String, MessageId> toLatestMessageIdForKey(
+ Map<String, Pair<MessageId, Long>> latestForKey) {
+ return latestForKey.entrySet()
+ .stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> entry.getValue().getLeft()));
+ }
+
+ @Override
+ protected boolean compactMessage(String topic, Map<String, Pair<MessageId,
Long>> latestForKey,
+ RawMessage m, MessageId id) {
+ boolean deletedMessage = false;
+ boolean replaceMessage = false;
+ MessageCompactionData mcd = extractMessageCompactionData(m);
+
+ if (mcd != null) {
+ boolean newer = Optional.ofNullable(latestForKey.get(mcd.key()))
+ .map(Pair::getRight)
+ .map(latestEventTime -> mcd.eventTime() != null
+ && mcd.eventTime() >= latestEventTime).orElse(true);
+ if (newer) {
+ if (mcd.payloadSize() > 0) {
+ Pair<MessageId, Long> old = latestForKey.put(mcd.key(),
+ new ImmutablePair<>(mcd.messageId(), mcd.eventTime()));
+ replaceMessage = old != null;
+ } else {
+ deletedMessage = true;
+ latestForKey.remove(mcd.key());
+ }
+ }
+ } else {
+ if (!topicCompactionRetainNullKey) {
+ deletedMessage = true;
+ }
+ }
+ if (replaceMessage || deletedMessage) {
+ mxBean.addCompactionRemovedEvent(topic);
+ }
+ return deletedMessage;
+ }
+
+ @Override
+ protected boolean compactBatchMessage(String topic, Map<String,
Pair<MessageId, Long>> latestForKey, RawMessage m,
+ MessageMetadata metadata, MessageId id) {
+ boolean deletedMessage = false;
+ try {
+ int numMessagesInBatch = metadata.getNumMessagesInBatch();
+ int deleteCnt = 0;
+
+ for (MessageCompactionData mcd :
extractMessageCompactionDataFromBatch(m)) {
+ if (mcd.key() == null) {
+ if (!topicCompactionRetainNullKey) {
+ // record delete null-key message event
+ deleteCnt++;
+ mxBean.addCompactionRemovedEvent(topic);
+ }
+ continue;
+ }
+
+ boolean newer = Optional.ofNullable(latestForKey.get(mcd.key()))
+ .map(Pair::getRight)
+ .map(latestEventTime -> mcd.eventTime() != null
+ && mcd.eventTime() > latestEventTime).orElse(true);
+ if (newer) {
+ if (mcd.payloadSize() > 0) {
+ Pair<MessageId, Long> old = latestForKey.put(mcd.key(),
+ new ImmutablePair<>(mcd.messageId(), mcd.eventTime()));
+ if (old != null) {
+ mxBean.addCompactionRemovedEvent(topic);
+ }
+ } else {
+ latestForKey.remove(mcd.key());
+ deleteCnt++;
+ mxBean.addCompactionRemovedEvent(topic);
+ }
+ }
+ }
+
+ if (deleteCnt == numMessagesInBatch) {
+ deletedMessage = true;
+ }
+ } catch (IOException ioe) {
+ log.info("Error decoding batch for message {}. Whole batch will be
included in output",
+ id, ioe);
+ }
+ return deletedMessage;
+ }
+
+ protected MessageCompactionData extractMessageCompactionData(RawMessage m) {
+ ByteBuf headersAndPayload = m.getHeadersAndPayload();
+ MessageMetadata msgMetadata =
Commands.parseMessageMetadata(headersAndPayload);
+ if (msgMetadata.hasPartitionKey()) {
+ int size = headersAndPayload.readableBytes();
+ if (msgMetadata.hasUncompressedSize()) {
+ size = msgMetadata.getUncompressedSize();
+ }
+ return new MessageCompactionData(m.getMessageId(),
msgMetadata.getPartitionKey(),
+ size, msgMetadata.getEventTime());
+ } else {
+ return null;
+ }
+ }
+
+ private List<MessageCompactionData>
extractMessageCompactionDataFromBatch(RawMessage msg)
+ throws IOException {
+ return RawBatchConverter.extractMessageCompactionData(msg);
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/MessageCompactionData.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/MessageCompactionData.java
new file mode 100644
index 00000000000..03800273a80
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/MessageCompactionData.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import org.apache.pulsar.client.api.MessageId;
+
+public record MessageCompactionData (MessageId messageId, String key, Integer
payloadSize, Long eventTime) {}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PublishingOrderCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PublishingOrderCompactor.java
new file mode 100644
index 00000000000..a825c0782fb
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PublishingOrderCompactor.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.commons.lang3.tuple.ImmutableTriple;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.RawBatchConverter;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PublishingOrderCompactor extends
AbstractTwoPhaseCompactor<MessageId> {
+
+ private static final Logger log =
LoggerFactory.getLogger(PublishingOrderCompactor.class);
+
+ public PublishingOrderCompactor(ServiceConfiguration conf,
+ PulsarClient pulsar,
+ BookKeeper bk,
+ ScheduledExecutorService scheduler) {
+ super(conf, pulsar, bk, scheduler);
+ }
+
+ @Override
+ protected Map<String, MessageId> toLatestMessageIdForKey(Map<String,
MessageId> latestForKey) {
+ return latestForKey;
+ }
+
+ @Override
+ protected boolean compactMessage(String topic, Map<String, MessageId>
latestForKey,
+ RawMessage m, MessageId id) {
+ boolean deletedMessage = false;
+ boolean replaceMessage = false;
+ Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
+ if (keyAndSize != null) {
+ if (keyAndSize.getRight() > 0) {
+ MessageId old = latestForKey.put(keyAndSize.getLeft(), id);
+ replaceMessage = old != null;
+ } else {
+ deletedMessage = true;
+ latestForKey.remove(keyAndSize.getLeft());
+ }
+ } else {
+ if (!topicCompactionRetainNullKey) {
+ deletedMessage = true;
+ }
+ }
+ if (replaceMessage || deletedMessage) {
+ mxBean.addCompactionRemovedEvent(topic);
+ }
+ return deletedMessage;
+ }
+
+ @Override
+ protected boolean compactBatchMessage(String topic, Map<String, MessageId>
latestForKey,
+ RawMessage m, MessageMetadata metadata, MessageId id) {
+ boolean deletedMessage = false;
+ try {
+ int numMessagesInBatch = metadata.getNumMessagesInBatch();
+ int deleteCnt = 0;
+ for (ImmutableTriple<MessageId, String, Integer> e :
extractIdsAndKeysAndSizeFromBatch(
+ m)) {
+ if (e != null) {
+ if (e.getMiddle() == null) {
+ if (!topicCompactionRetainNullKey) {
+ // record delete null-key message event
+ deleteCnt++;
+ mxBean.addCompactionRemovedEvent(topic);
+ }
+ continue;
+ }
+ if (e.getRight() > 0) {
+ MessageId old = latestForKey.put(e.getMiddle(),
e.getLeft());
+ if (old != null) {
+ mxBean.addCompactionRemovedEvent(topic);
+ }
+ } else {
+ latestForKey.remove(e.getMiddle());
+ deleteCnt++;
+ mxBean.addCompactionRemovedEvent(topic);
+ }
+ }
+ }
+ if (deleteCnt == numMessagesInBatch) {
+ deletedMessage = true;
+ }
+ } catch (IOException ioe) {
+ log.info(
+ "Error decoding batch for message {}. Whole batch will be
included in output",
+ id, ioe);
+ }
+
+ return deletedMessage;
+ }
+
+ protected List<ImmutableTriple<MessageId, String, Integer>>
extractIdsAndKeysAndSizeFromBatch(
+ RawMessage msg)
+ throws IOException {
+ return RawBatchConverter.extractIdsAndKeysAndSize(msg);
+ }
+
+}
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
index 424733ad581..90132461b4c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
@@ -54,7 +54,7 @@ public class PulsarCompactionServiceFactory implements
CompactionServiceFactory
}
protected Compactor newCompactor() throws PulsarServerException {
- return new TwoPhaseCompactor(pulsarService.getConfiguration(),
+ return new PublishingOrderCompactor(pulsarService.getConfiguration(),
pulsarService.getClient(), pulsarService.getBookKeeperClient(),
pulsarService.getCompactorExecutor());
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
index fefa2ee959c..1b54092d9aa 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
@@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory;
* <p>As the first pass caches the entire message(not just offset) for each
key into a map,
* this compaction could be memory intensive if the message payload is large.
*/
-public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor {
+public class StrategicTwoPhaseCompactor extends PublishingOrderCompactor {
private static final Logger log =
LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class);
private static final int MAX_OUTSTANDING = 500;
private static final int MAX_READER_RECONNECT_WAITING_TIME_IN_MILLIS = 20
* 1000;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
deleted file mode 100644
index 647c34a94ad..00000000000
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ /dev/null
@@ -1,470 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.compaction;
-
-import io.netty.buffer.ByteBuf;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiPredicate;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
-import org.apache.commons.lang3.tuple.ImmutableTriple;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.RawMessage;
-import org.apache.pulsar.client.api.RawReader;
-import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.RawBatchConverter;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.protocol.Markers;
-import org.apache.pulsar.common.util.FutureUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Compaction will go through the topic in two passes. The first pass
- * selects latest offset for each key in the topic. Then the second pass
- * writes these values to a ledger.
- *
- * <p>The two passes are required to avoid holding the payloads of each of
- * the latest values in memory, as the payload can be many orders of
- * magnitude larger than a message id.
-*/
-public class TwoPhaseCompactor extends Compactor {
- private static final Logger log =
LoggerFactory.getLogger(TwoPhaseCompactor.class);
- private static final int MAX_OUTSTANDING = 500;
- private final Duration phaseOneLoopReadTimeout;
- private final boolean topicCompactionRetainNullKey;
-
- public TwoPhaseCompactor(ServiceConfiguration conf,
- PulsarClient pulsar,
- BookKeeper bk,
- ScheduledExecutorService scheduler) {
- super(conf, pulsar, bk, scheduler);
- phaseOneLoopReadTimeout =
Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
- topicCompactionRetainNullKey = conf.isTopicCompactionRetainNullKey();
- }
-
- @Override
- protected CompletableFuture<Long> doCompaction(RawReader reader,
BookKeeper bk) {
- return reader.hasMessageAvailableAsync()
- .thenCompose(available -> {
- if (available) {
- return phaseOne(reader).thenCompose(
- (r) -> phaseTwo(reader, r.from, r.to,
r.lastReadId, r.latestForKey, bk));
- } else {
- log.info("Skip compaction of the empty topic {}",
reader.getTopic());
- return CompletableFuture.completedFuture(-1L);
- }
- });
- }
-
- private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) {
- Map<String, MessageId> latestForKey = new HashMap<>();
- CompletableFuture<PhaseOneResult> loopPromise = new
CompletableFuture<>();
-
- reader.getLastMessageIdAsync()
- .thenAccept(lastMessageId -> {
- log.info("Commencing phase one of compaction for {},
reading to {}",
- reader.getTopic(), lastMessageId);
- // Each entry is processed as a whole, discard the
batchIndex part deliberately.
- MessageIdImpl lastImpl = (MessageIdImpl) lastMessageId;
- MessageIdImpl lastEntryMessageId = new
MessageIdImpl(lastImpl.getLedgerId(), lastImpl.getEntryId(),
- lastImpl.getPartitionIndex());
- phaseOneLoop(reader, Optional.empty(), Optional.empty(),
lastEntryMessageId, latestForKey,
- loopPromise);
- }).exceptionally(ex -> {
- loopPromise.completeExceptionally(ex);
- return null;
- });
-
- return loopPromise;
- }
-
- private void phaseOneLoop(RawReader reader,
- Optional<MessageId> firstMessageId,
- Optional<MessageId> toMessageId,
- MessageId lastMessageId,
- Map<String, MessageId> latestForKey,
- CompletableFuture<PhaseOneResult> loopPromise) {
- if (loopPromise.isDone()) {
- return;
- }
- CompletableFuture<RawMessage> future = reader.readNextAsync();
- FutureUtil.addTimeoutHandling(future,
- phaseOneLoopReadTimeout, scheduler,
- () -> FutureUtil.createTimeoutException("Timeout", getClass(),
"phaseOneLoop(...)"));
-
- future.thenAcceptAsync(m -> {
- try (m) {
- MessageId id = m.getMessageId();
- boolean deletedMessage = false;
- boolean replaceMessage = false;
- mxBean.addCompactionReadOp(reader.getTopic(),
m.getHeadersAndPayload().readableBytes());
- MessageMetadata metadata =
Commands.parseMessageMetadata(m.getHeadersAndPayload());
- if (Markers.isServerOnlyMarker(metadata)) {
- mxBean.addCompactionRemovedEvent(reader.getTopic());
- deletedMessage = true;
- } else if (RawBatchConverter.isReadableBatch(metadata)) {
- try {
- int numMessagesInBatch =
metadata.getNumMessagesInBatch();
- int deleteCnt = 0;
- for (ImmutableTriple<MessageId, String, Integer> e :
extractIdsAndKeysAndSizeFromBatch(m)) {
- if (e != null) {
- if (e.getMiddle() == null) {
- if (!topicCompactionRetainNullKey) {
- // record delete null-key message event
- deleteCnt++;
-
mxBean.addCompactionRemovedEvent(reader.getTopic());
- }
- continue;
- }
- if (e.getRight() > 0) {
- MessageId old =
latestForKey.put(e.getMiddle(), e.getLeft());
- if (old != null) {
-
mxBean.addCompactionRemovedEvent(reader.getTopic());
- }
- } else {
- latestForKey.remove(e.getMiddle());
- deleteCnt++;
-
mxBean.addCompactionRemovedEvent(reader.getTopic());
- }
- }
- }
- if (deleteCnt == numMessagesInBatch) {
- deletedMessage = true;
- }
- } catch (IOException ioe) {
- log.info("Error decoding batch for message {}. Whole
batch will be included in output",
- id, ioe);
- }
- } else {
- Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
- if (keyAndSize != null) {
- if (keyAndSize.getRight() > 0) {
- MessageId old =
latestForKey.put(keyAndSize.getLeft(), id);
- replaceMessage = old != null;
- } else {
- deletedMessage = true;
- latestForKey.remove(keyAndSize.getLeft());
- }
- } else {
- if (!topicCompactionRetainNullKey) {
- deletedMessage = true;
- }
- }
- if (replaceMessage || deletedMessage) {
- mxBean.addCompactionRemovedEvent(reader.getTopic());
- }
- }
- MessageId first = firstMessageId.orElse(deletedMessage ? null
: id);
- MessageId to = deletedMessage ? toMessageId.orElse(null) : id;
- if (id.compareTo(lastMessageId) == 0) {
- loopPromise.complete(new PhaseOneResult(first == null ? id
: first, to == null ? id : to,
- lastMessageId, latestForKey));
- } else {
- phaseOneLoop(reader,
- Optional.ofNullable(first),
- Optional.ofNullable(to),
- lastMessageId,
- latestForKey, loopPromise);
- }
- }
- }, scheduler).exceptionally(ex -> {
- loopPromise.completeExceptionally(ex);
- return null;
- });
- }
-
- private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from,
MessageId to, MessageId lastReadId,
- Map<String, MessageId> latestForKey, BookKeeper bk) {
- Map<String, byte[]> metadata =
-
LedgerMetadataUtils.buildMetadataForCompactedLedger(reader.getTopic(),
to.toByteArray());
- return createLedger(bk, metadata).thenCompose((ledger) -> {
- log.info("Commencing phase two of compaction for {}, from {} to
{}, compacting {} keys to ledger {}",
- reader.getTopic(), from, to, latestForKey.size(),
ledger.getId());
- return phaseTwoSeekThenLoop(reader, from, to, lastReadId,
latestForKey, bk, ledger);
- });
- }
-
- private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader,
MessageId from, MessageId to,
- MessageId lastReadId, Map<String, MessageId> latestForKey,
BookKeeper bk, LedgerHandle ledger) {
- CompletableFuture<Long> promise = new CompletableFuture<>();
-
- reader.seekAsync(from).thenCompose((v) -> {
- Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
- CompletableFuture<Void> loopPromise = new CompletableFuture<>();
- phaseTwoLoop(reader, to, latestForKey, ledger, outstanding,
loopPromise, MessageId.earliest);
- return loopPromise;
- }).thenCompose((v) -> closeLedger(ledger))
- .thenCompose((v) ->
reader.acknowledgeCumulativeAsync(lastReadId,
- Map.of(COMPACTED_TOPIC_LEDGER_PROPERTY,
ledger.getId())))
- .whenComplete((res, exception) -> {
- if (exception != null) {
- deleteLedger(bk, ledger).whenComplete((res2,
exception2) -> {
- if (exception2 != null) {
- log.warn("Cleanup of ledger {} for failed",
ledger, exception2);
- }
- // complete with original exception
- promise.completeExceptionally(exception);
- });
- } else {
- promise.complete(ledger.getId());
- }
- });
- return promise;
- }
-
- private void phaseTwoLoop(RawReader reader, MessageId to, Map<String,
MessageId> latestForKey,
- LedgerHandle lh, Semaphore outstanding,
CompletableFuture<Void> promise,
- MessageId lastCompactedMessageId) {
- if (promise.isDone()) {
- return;
- }
- reader.readNextAsync().thenAcceptAsync(m -> {
- if (promise.isDone()) {
- m.close();
- return;
- }
-
- if (m.getMessageId().compareTo(lastCompactedMessageId) <= 0) {
- m.close();
- phaseTwoLoop(reader, to, latestForKey, lh, outstanding,
promise, lastCompactedMessageId);
- return;
- }
-
- try {
- MessageId id = m.getMessageId();
- Optional<RawMessage> messageToAdd = Optional.empty();
- mxBean.addCompactionReadOp(reader.getTopic(),
m.getHeadersAndPayload().readableBytes());
- MessageMetadata metadata =
Commands.parseMessageMetadata(m.getHeadersAndPayload());
- if (Markers.isServerOnlyMarker(metadata)) {
- messageToAdd = Optional.empty();
- } else if (RawBatchConverter.isReadableBatch(metadata)) {
- try {
- messageToAdd = rebatchMessage(reader.getTopic(),
- m, (key, subid) ->
subid.equals(latestForKey.get(key)), topicCompactionRetainNullKey);
- } catch (IOException ioe) {
- log.info("Error decoding batch for message {}. Whole
batch will be included in output",
- id, ioe);
- messageToAdd = Optional.of(m);
- }
- } else {
- Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
- MessageId msg;
- if (keyAndSize == null) {
- messageToAdd = topicCompactionRetainNullKey ?
Optional.of(m) : Optional.empty();
- } else if ((msg = latestForKey.get(keyAndSize.getLeft()))
!= null
- && msg.equals(id)) { // consider message only if
present into latestForKey map
- if (keyAndSize.getRight() <= 0) {
- promise.completeExceptionally(new
IllegalArgumentException(
- "Compaction phase found empty record from
sorted key-map"));
- }
- messageToAdd = Optional.of(m);
- }
- }
-
- if (messageToAdd.isPresent()) {
- RawMessage message = messageToAdd.get();
- try {
- outstanding.acquire();
- CompletableFuture<Void> addFuture =
addToCompactedLedger(lh, message, reader.getTopic())
- .whenComplete((res, exception2) -> {
- outstanding.release();
- if (exception2 != null) {
-
promise.completeExceptionally(exception2);
- }
- });
- if (to.equals(id)) {
- // make sure all inflight writes have finished
- outstanding.acquire(MAX_OUTSTANDING);
- addFuture.whenComplete((res, exception2) -> {
- if (exception2 == null) {
- promise.complete(null);
- }
- });
- return;
- }
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- promise.completeExceptionally(ie);
- } finally {
- if (message != m) {
- message.close();
- }
- }
- } else if (to.equals(id)) {
- // Reached to last-id and phase-one found it
deleted-message while iterating on ledger so,
- // not present under latestForKey. Complete the compaction.
- try {
- // make sure all inflight writes have finished
- outstanding.acquire(MAX_OUTSTANDING);
- promise.complete(null);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- promise.completeExceptionally(e);
- }
- return;
- }
- phaseTwoLoop(reader, to, latestForKey, lh, outstanding,
promise, m.getMessageId());
- } finally {
- m.close();
- }
- }, scheduler).exceptionally(ex -> {
- promise.completeExceptionally(ex);
- return null;
- });
- }
-
- protected CompletableFuture<LedgerHandle> createLedger(BookKeeper bk,
Map<String, byte[]> metadata) {
- CompletableFuture<LedgerHandle> bkf = new CompletableFuture<>();
-
- try {
- bk.asyncCreateLedger(conf.getManagedLedgerDefaultEnsembleSize(),
- conf.getManagedLedgerDefaultWriteQuorum(),
- conf.getManagedLedgerDefaultAckQuorum(),
- Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
- Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD,
- (rc, ledger, ctx) -> {
- if (rc != BKException.Code.OK) {
- bkf.completeExceptionally(BKException.create(rc));
- } else {
- bkf.complete(ledger);
- }
- }, null, metadata);
- } catch (Throwable t) {
- log.error("Encountered unexpected error when creating compaction
ledger", t);
- return FutureUtil.failedFuture(t);
- }
- return bkf;
- }
-
- protected CompletableFuture<Void> deleteLedger(BookKeeper bk, LedgerHandle
lh) {
- CompletableFuture<Void> bkf = new CompletableFuture<>();
- try {
- bk.asyncDeleteLedger(lh.getId(),
- (rc, ctx) -> {
- if (rc != BKException.Code.OK) {
- bkf.completeExceptionally(BKException.create(rc));
- } else {
- bkf.complete(null);
- }
- }, null);
- } catch (Throwable t) {
- return FutureUtil.failedFuture(t);
- }
- return bkf;
- }
-
- protected CompletableFuture<Void> closeLedger(LedgerHandle lh) {
- CompletableFuture<Void> bkf = new CompletableFuture<>();
- try {
- lh.asyncClose((rc, ledger, ctx) -> {
- if (rc != BKException.Code.OK) {
- bkf.completeExceptionally(BKException.create(rc));
- } else {
- bkf.complete(null);
- }
- }, null);
- } catch (Throwable t) {
- return FutureUtil.failedFuture(t);
- }
- return bkf;
- }
-
- private CompletableFuture<Void> addToCompactedLedger(LedgerHandle lh,
RawMessage m, String topic) {
- CompletableFuture<Void> bkf = new CompletableFuture<>();
- ByteBuf serialized = m.serialize();
- try {
- mxBean.addCompactionWriteOp(topic,
m.getHeadersAndPayload().readableBytes());
- long start = System.nanoTime();
- lh.asyncAddEntry(serialized,
- (rc, ledger, eid, ctx) -> {
- mxBean.addCompactionLatencyOp(topic, System.nanoTime()
- start, TimeUnit.NANOSECONDS);
- if (rc != BKException.Code.OK) {
- bkf.completeExceptionally(BKException.create(rc));
- } else {
- bkf.complete(null);
- }
- }, null);
- } catch (Throwable t) {
- return FutureUtil.failedFuture(t);
- }
- return bkf;
- }
-
- protected Pair<String, Integer> extractKeyAndSize(RawMessage m) {
- ByteBuf headersAndPayload = m.getHeadersAndPayload();
- MessageMetadata msgMetadata =
Commands.parseMessageMetadata(headersAndPayload);
- if (msgMetadata.hasPartitionKey()) {
- int size = headersAndPayload.readableBytes();
- if (msgMetadata.hasUncompressedSize()) {
- size = msgMetadata.getUncompressedSize();
- }
- return Pair.of(msgMetadata.getPartitionKey(), size);
- } else {
- return null;
- }
- }
-
- protected List<ImmutableTriple<MessageId, String, Integer>>
extractIdsAndKeysAndSizeFromBatch(RawMessage msg)
- throws IOException {
- return RawBatchConverter.extractIdsAndKeysAndSize(msg);
- }
-
- protected Optional<RawMessage> rebatchMessage(String topic, RawMessage
msg, BiPredicate<String, MessageId> filter,
- boolean retainNullKey)
- throws IOException {
- if (log.isDebugEnabled()) {
- log.debug("Rebatching message {} for topic {}",
msg.getMessageId(), topic);
- }
- return RawBatchConverter.rebatchMessage(msg, filter, retainNullKey);
- }
-
- private static class PhaseOneResult {
- final MessageId from;
- final MessageId to; // last undeleted messageId
- final MessageId lastReadId; // last read messageId
- final Map<String, MessageId> latestForKey;
-
- PhaseOneResult(MessageId from, MessageId to, MessageId lastReadId,
Map<String, MessageId> latestForKey) {
- this.from = from;
- this.to = to;
- this.lastReadId = lastReadId;
- this.latestForKey = latestForKey;
- }
- }
-
- public long getPhaseOneLoopReadTimeoutInSeconds() {
- return phaseOneLoopReadTimeout.getSeconds();
- }
-}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
index ac1ba6bc814..45dc30d21df 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
@@ -62,7 +62,7 @@ import org.testng.annotations.Test;
public class CompactionRetentionTest extends MockedPulsarServiceBaseTest {
protected ScheduledExecutorService compactionScheduler;
protected BookKeeper bk;
- private TwoPhaseCompactor compactor;
+ private PublishingOrderCompactor compactor;
@BeforeMethod
@Override
@@ -79,7 +79,7 @@ public class CompactionRetentionTest extends
MockedPulsarServiceBaseTest {
compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null,
Optional.empty(), null).get();
- compactor = new TwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor = new PublishingOrderCompactor(conf, pulsarClient, bk,
compactionScheduler);
}
@AfterMethod(alwaysRun = true)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 0cf32859e3d..19f42a7e057 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -109,7 +109,7 @@ import org.testng.annotations.Test;
public class CompactionTest extends MockedPulsarServiceBaseTest {
protected ScheduledExecutorService compactionScheduler;
protected BookKeeper bk;
- private TwoPhaseCompactor compactor;
+ private PublishingOrderCompactor compactor;
@BeforeMethod
@Override
@@ -124,7 +124,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null,
Optional.empty(), null).get();
- compactor = new TwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor = new PublishingOrderCompactor(conf, pulsarClient, bk,
compactionScheduler);
}
@AfterMethod(alwaysRun = true)
@@ -147,7 +147,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
return compactor.compact(topic).get();
}
- protected TwoPhaseCompactor getCompactor() {
+ protected PublishingOrderCompactor getCompactor() {
return compactor;
}
@@ -656,7 +656,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
public void testKeyLessMessagesPassThrough(boolean retainNullKey) throws
Exception {
conf.setTopicCompactionRetainNullKey(retainNullKey);
restartBroker();
- FieldUtils.writeDeclaredField(compactor,
"topicCompactionRetainNullKey", retainNullKey, true);
+ FieldUtils.writeField(compactor, "topicCompactionRetainNullKey",
retainNullKey, true);
String topic = "persistent://my-property/use/my-ns/my-topic1";
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 1c09dc0d643..5cf7d33200d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -101,7 +101,7 @@ public class CompactorTest extends
MockedPulsarServiceBaseTest {
new
ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
bk = pulsar.getBookKeeperClientFactory().create(
this.conf, null, null, Optional.empty(), null).get();
- compactor = new TwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor = new PublishingOrderCompactor(conf, pulsarClient, bk,
compactionScheduler);
}
@@ -127,7 +127,7 @@ public class CompactorTest extends
MockedPulsarServiceBaseTest {
return compactor;
}
- private List<String> compactAndVerify(String topic, Map<String, byte[]>
expected, boolean checkMetrics)
+ protected List<String> compactAndVerify(String topic, Map<String, byte[]>
expected, boolean checkMetrics)
throws Exception {
long compactedLedgerId = compact(topic);
@@ -361,7 +361,7 @@ public class CompactorTest extends
MockedPulsarServiceBaseTest {
PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
ConnectionPool connectionPool = mock(ConnectionPool.class);
when(mockClient.getCnxPool()).thenReturn(connectionPool);
- TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration,
mockClient,
+ PublishingOrderCompactor compactor = new
PublishingOrderCompactor(configuration, mockClient,
Mockito.mock(BookKeeper.class), compactionScheduler);
Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(),
60);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/EventTimeOrderCompactorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/EventTimeOrderCompactorTest.java
new file mode 100644
index 00000000000..8fba0983123
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/EventTimeOrderCompactorTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricDoubleSumValue;
+import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
+import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-compaction")
+public class EventTimeOrderCompactorTest extends CompactorTest {
+
+ private EventTimeOrderCompactor compactor;
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ compactor = new EventTimeOrderCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ }
+
+ @Override
+ protected long compact(String topic) throws ExecutionException,
InterruptedException {
+ return compactor.compact(topic).get();
+ }
+
+ @Override
+ protected Compactor getCompactor() {
+ return compactor;
+ }
+
+ @Test
+ public void testCompactedOutByEventTime() throws Exception {
+ String topicName =
BrokerTestUtil.newUniqueName("persistent://my-property/use/my-ns/testCompactedOutByEventTime");
+ this.restartBroker();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(true).topic(topicName).batchingMaxMessages(3).create();
+
+ producer.newMessage().key("K1").value("V1").eventTime(1L).sendAsync();
+ producer.newMessage().key("K2").value("V2").eventTime(1L).sendAsync();
+ producer.newMessage().key("K2").value(null).eventTime(2L).sendAsync();
+ producer.flush();
+
+ admin.topics().triggerCompaction(topicName);
+
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertEquals(admin.topics().compactionStatus(topicName).status,
+ LongRunningProcessStatus.Status.SUCCESS);
+ });
+
+ var attributes = Attributes.builder()
+ .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent")
+ .put(OpenTelemetryAttributes.PULSAR_TENANT, "my-property")
+ .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "my-property/use/my-ns")
+ .put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName)
+ .build();
+ var metrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ assertMetricLongSumValue(metrics,
OpenTelemetryTopicStats.COMPACTION_REMOVED_COUNTER, attributes, 1);
+ assertMetricLongSumValue(metrics,
OpenTelemetryTopicStats.COMPACTION_OPERATION_COUNTER, Attributes.builder()
+ .putAll(attributes)
+ .put(OpenTelemetryAttributes.PULSAR_COMPACTION_STATUS, "success")
+ .build(),
+ 1);
+ assertMetricLongSumValue(metrics,
OpenTelemetryTopicStats.COMPACTION_OPERATION_COUNTER, Attributes.builder()
+ .putAll(attributes)
+ .put(OpenTelemetryAttributes.PULSAR_COMPACTION_STATUS, "failure")
+ .build(),
+ 0);
+ assertMetricDoubleSumValue(metrics,
OpenTelemetryTopicStats.COMPACTION_DURATION_SECONDS, attributes,
+ actual -> assertThat(actual).isPositive());
+ assertMetricLongSumValue(metrics,
OpenTelemetryTopicStats.COMPACTION_BYTES_IN_COUNTER, attributes,
+ actual -> assertThat(actual).isPositive());
+ assertMetricLongSumValue(metrics,
OpenTelemetryTopicStats.COMPACTION_BYTES_OUT_COUNTER, attributes,
+ actual -> assertThat(actual).isPositive());
+ assertMetricLongSumValue(metrics,
OpenTelemetryTopicStats.COMPACTION_ENTRIES_COUNTER, attributes, 1);
+ assertMetricLongSumValue(metrics,
OpenTelemetryTopicStats.COMPACTION_BYTES_COUNTER, attributes,
+ actual -> assertThat(actual).isPositive());
+
+ producer.newMessage().key("K1").eventTime(2L).value("V1-2").sendAsync();
+ producer.flush();
+
+ admin.topics().triggerCompaction(topicName);
+
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertEquals(admin.topics().compactionStatus(topicName).status,
+ LongRunningProcessStatus.Status.SUCCESS);
+ });
+
+ @Cleanup
+ Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+ .subscriptionName("reader-test")
+ .topic(topicName)
+ .readCompacted(true)
+ .startMessageId(MessageId.earliest)
+ .create();
+ while (reader.hasMessageAvailable()) {
+ Message<String> message = reader.readNext(3, TimeUnit.SECONDS);
+ Assert.assertEquals(message.getEventTime(), 2L);
+ }
+ }
+
+ @Test
+ public void testCompactWithEventTimeAddCompact() throws Exception {
+ String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ Map<String, byte[]> expected = new HashMap<>();
+
+ producer.newMessage()
+ .key("a")
+ .eventTime(1L)
+ .value("A_1".getBytes())
+ .send();
+ producer.newMessage()
+ .key("b")
+ .eventTime(1L)
+ .value("B_1".getBytes())
+ .send();
+ producer.newMessage()
+ .key("a")
+ .eventTime(2L)
+ .value("A_2".getBytes())
+ .send();
+ expected.put("a", "A_2".getBytes());
+ expected.put("b", "B_1".getBytes());
+
+ compactAndVerify(topic, new HashMap<>(expected), false);
+
+ producer.newMessage()
+ .key("b")
+ .eventTime(2L)
+ .value("B_2".getBytes())
+ .send();
+ expected.put("b", "B_2".getBytes());
+
+ compactAndVerify(topic, expected, false);
+ }
+
+ @Override
+ @Test
+ public void testPhaseOneLoopTimeConfiguration() {
+ ServiceConfiguration configuration = new ServiceConfiguration();
+ configuration.setBrokerServiceCompactionPhaseOneLoopTimeInSeconds(60);
+ PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(mockClient.getCnxPool()).thenReturn(connectionPool);
+ EventTimeOrderCompactor compactor = new
EventTimeOrderCompactor(configuration, mockClient,
+ Mockito.mock(BookKeeper.class), compactionScheduler);
+ Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(), 60);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
index 54563431052..d1ff46cbc02 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
@@ -74,7 +74,7 @@ public class StrategicCompactionTest extends CompactionTest {
}
@Override
- protected TwoPhaseCompactor getCompactor() {
+ protected PublishingOrderCompactor getCompactor() {
return compactor;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
index 2aa09309d39..9f33479ce4c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
@@ -53,7 +53,7 @@ public class TopicCompactionServiceTest extends
MockedPulsarServiceBaseTest {
protected ScheduledExecutorService compactionScheduler;
protected BookKeeper bk;
- private TwoPhaseCompactor compactor;
+ private PublishingOrderCompactor compactor;
@BeforeMethod
@Override
@@ -73,7 +73,7 @@ public class TopicCompactionServiceTest extends
MockedPulsarServiceBaseTest {
new
ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
bk = pulsar.getBookKeeperClientFactory().create(
this.conf, null, null, Optional.empty(), null).get();
- compactor = new TwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor = new PublishingOrderCompactor(conf, pulsarClient, bk,
compactionScheduler);
}
@AfterMethod(alwaysRun = true)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index ab4925bfeb8..74c2a93b84e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -62,7 +62,7 @@ import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.pulsar.compaction.TwoPhaseCompactor;
+import org.apache.pulsar.compaction.PublishingOrderCompactor;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
@@ -259,7 +259,7 @@ public class PulsarFunctionE2ETest extends
AbstractPulsarE2ETest {
@Cleanup("shutdownNow")
ScheduledExecutorService compactionScheduler =
Executors.newSingleThreadScheduledExecutor(
new
ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
- TwoPhaseCompactor twoPhaseCompactor = new TwoPhaseCompactor(config,
+ PublishingOrderCompactor twoPhaseCompactor = new
PublishingOrderCompactor(config,
pulsarClient, pulsar.getBookKeeperClient(),
compactionScheduler);
twoPhaseCompactor.compact(sourceTopic).get();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 7edc87bb996..be2b377a9cf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -49,7 +49,7 @@ import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.pulsar.compaction.TwoPhaseCompactor;
+import org.apache.pulsar.compaction.PublishingOrderCompactor;
import org.apache.pulsar.functions.LocalRunner;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.utils.FunctionCommon;
@@ -107,7 +107,7 @@ public class PulsarSinkE2ETest extends
AbstractPulsarE2ETest {
@Cleanup("shutdownNow")
ScheduledExecutorService compactionScheduler =
Executors.newSingleThreadScheduledExecutor(
new
ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
- TwoPhaseCompactor twoPhaseCompactor = new TwoPhaseCompactor(config,
+ PublishingOrderCompactor twoPhaseCompactor = new
PublishingOrderCompactor(config,
pulsarClient, pulsar.getBookKeeperClient(),
compactionScheduler);
twoPhaseCompactor.compact(sourceTopic).get();