This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c495e190b3 [feat] PIP-352: Event time based compaction (#22517)
1c495e190b3 is described below

commit 1c495e190b3c569e9dfd44acef2a697c93a1f771
Author: Marek Czajkowski <[email protected]>
AuthorDate: Fri Aug 23 16:49:15 2024 +0200

    [feat] PIP-352: Event time based compaction (#22517)
---
 conf/broker.conf                                   |   6 +
 conf/standalone.conf                               |   6 +
 .../pulsar/client/impl/RawBatchConverter.java      |  33 +-
 .../compaction/AbstractTwoPhaseCompactor.java      | 439 +++++++++++++++++++
 .../apache/pulsar/compaction/CompactorTool.java    |   2 +-
 .../EventTimeCompactionServiceFactory.java         |  33 ++
 .../pulsar/compaction/EventTimeOrderCompactor.java | 161 +++++++
 .../pulsar/compaction/MessageCompactionData.java   |  23 +
 .../compaction/PublishingOrderCompactor.java       | 127 ++++++
 .../compaction/PulsarCompactionServiceFactory.java |   2 +-
 .../compaction/StrategicTwoPhaseCompactor.java     |   2 +-
 .../pulsar/compaction/TwoPhaseCompactor.java       | 470 ---------------------
 .../pulsar/compaction/CompactionRetentionTest.java |   4 +-
 .../apache/pulsar/compaction/CompactionTest.java   |   8 +-
 .../apache/pulsar/compaction/CompactorTest.java    |   6 +-
 .../compaction/EventTimeOrderCompactorTest.java    | 201 +++++++++
 .../pulsar/compaction/StrategicCompactionTest.java |   2 +-
 .../compaction/TopicCompactionServiceTest.java     |   4 +-
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    |   4 +-
 .../org/apache/pulsar/io/PulsarSinkE2ETest.java    |   4 +-
 20 files changed, 1037 insertions(+), 500 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index e5d8a32e717..fc32246adea 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -563,6 +563,12 @@ brokerServiceCompactionPhaseOneLoopTimeInSeconds=30
 # Whether retain null-key message during topic compaction
 topicCompactionRetainNullKey=false
 
+# Class name of the factory that implements the topic compaction service.
+# If value is "org.apache.pulsar.compaction.EventTimeCompactionServiceFactory",
+# will create topic compaction service based on message eventTime.
+# By default compaction service is based on message publishing order.
+compactionServiceFactoryClassName=org.apache.pulsar.compaction.PulsarCompactionServiceFactory
+
 # Whether to enable the delayed delivery for messages.
 # If disabled, messages will be immediately delivered and there will
 # be no tracking overhead.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 30b39af8869..ae696410d86 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -1318,3 +1318,9 @@ disableBrokerInterceptors=true
 
 # Whether retain null-key message during topic compaction
 topicCompactionRetainNullKey=false
+
+# Class name of the factory that implements the topic compaction service.
+# If value is "org.apache.pulsar.compaction.EventTimeCompactionServiceFactory",
+# will create topic compaction service based on message eventTime.
+# By default compaction service is based on message publishing order.
+compactionServiceFactoryClassName=org.apache.pulsar.compaction.PulsarCompactionServiceFactory
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index 4c24f6d3036..f41a7aedd59 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -38,6 +38,7 @@ import 
org.apache.pulsar.common.api.proto.SingleMessageMetadata;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.compaction.MessageCompactionData;
 
 public class RawBatchConverter {
 
@@ -51,8 +52,8 @@ public class RawBatchConverter {
         return metadata.hasNumMessagesInBatch() && 
metadata.getEncryptionKeysCount() == 0;
     }
 
-    public static List<ImmutableTriple<MessageId, String, Integer>> 
extractIdsAndKeysAndSize(RawMessage msg)
-            throws IOException {
+    public static List<MessageCompactionData> 
extractMessageCompactionData(RawMessage msg)
+        throws IOException {
         checkArgument(msg.getMessageIdData().getBatchIndex() == -1);
 
         ByteBuf payload = msg.getHeadersAndPayload();
@@ -64,25 +65,35 @@ public class RawBatchConverter {
         int uncompressedSize = metadata.getUncompressedSize();
         ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);
 
-        List<ImmutableTriple<MessageId, String, Integer>> idsAndKeysAndSize = 
new ArrayList<>();
+        List<MessageCompactionData> messageCompactionDataList = new 
ArrayList<>();
 
         SingleMessageMetadata smm = new SingleMessageMetadata();
         for (int i = 0; i < batchSize; i++) {
             ByteBuf singleMessagePayload = 
Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
-                                                                               
     smm,
-                                                                               
     0, batchSize);
+                smm,
+                0, batchSize);
             MessageId id = new 
BatchMessageIdImpl(msg.getMessageIdData().getLedgerId(),
-                                                  
msg.getMessageIdData().getEntryId(),
-                                                  
msg.getMessageIdData().getPartition(),
-                                                  i);
+                msg.getMessageIdData().getEntryId(),
+                msg.getMessageIdData().getPartition(),
+                i);
             if (!smm.isCompactedOut()) {
-                idsAndKeysAndSize.add(ImmutableTriple.of(id,
-                            smm.hasPartitionKey() ? smm.getPartitionKey() : 
null,
-                            smm.hasPayloadSize() ? smm.getPayloadSize() : 0));
+                messageCompactionDataList.add(new MessageCompactionData(id,
+                    smm.hasPartitionKey() ? smm.getPartitionKey() : null,
+                    smm.hasPayloadSize() ? smm.getPayloadSize() : 0, 
smm.getEventTime()));
             }
             singleMessagePayload.release();
         }
         uncompressedPayload.release();
+        return messageCompactionDataList;
+    }
+
+    public static List<ImmutableTriple<MessageId, String, Integer>> 
extractIdsAndKeysAndSize(
+        RawMessage msg)
+        throws IOException {
+        List<ImmutableTriple<MessageId, String, Integer>> idsAndKeysAndSize = 
new ArrayList<>();
+        for (MessageCompactionData mcd : extractMessageCompactionData(msg)) {
+            idsAndKeysAndSize.add(ImmutableTriple.of(mcd.messageId(), 
mcd.key(), mcd.payloadSize()));
+        }
         return idsAndKeysAndSize;
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
new file mode 100644
index 00000000000..5b03f270251
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.api.RawReader;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.RawBatchConverter;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Compaction will go through the topic in two passes. The first pass
+ * selects latest offset for each key in the topic. Then the second pass
+ * writes these values to a ledger.
+ *
+ * <p>The two passes are required to avoid holding the payloads of each of
+ * the latest values in memory, as the payload can be many orders of
+ * magnitude larger than a message id.
+ */
+public abstract class AbstractTwoPhaseCompactor<T> extends Compactor {
+
+  private static final Logger log = 
LoggerFactory.getLogger(AbstractTwoPhaseCompactor.class);
+  protected static final int MAX_OUTSTANDING = 500;
+  protected final Duration phaseOneLoopReadTimeout;
+  protected final boolean topicCompactionRetainNullKey;
+
+  public AbstractTwoPhaseCompactor(ServiceConfiguration conf,
+      PulsarClient pulsar,
+      BookKeeper bk,
+      ScheduledExecutorService scheduler) {
+    super(conf, pulsar, bk, scheduler);
+    phaseOneLoopReadTimeout = Duration.ofSeconds(
+        conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
+    topicCompactionRetainNullKey = conf.isTopicCompactionRetainNullKey();
+  }
+
+  protected abstract Map<String, MessageId> 
toLatestMessageIdForKey(Map<String, T> latestForKey);
+
+  protected abstract boolean compactMessage(String topic, Map<String, T> 
latestForKey,
+      RawMessage m, MessageId id);
+
+
+  protected abstract boolean compactBatchMessage(String topic, Map<String, T> 
latestForKey,
+      RawMessage m,
+      MessageMetadata metadata, MessageId id);
+
+  @Override
+  protected CompletableFuture<Long> doCompaction(RawReader reader, BookKeeper 
bk) {
+    return reader.hasMessageAvailableAsync()
+        .thenCompose(available -> {
+          if (available) {
+            return phaseOne(reader).thenCompose(
+                (r) -> phaseTwo(reader, r.from, r.to, r.lastReadId, 
toLatestMessageIdForKey(r.latestForKey), bk));
+          } else {
+            log.info("Skip compaction of the empty topic {}", 
reader.getTopic());
+            return CompletableFuture.completedFuture(-1L);
+          }
+        });
+  }
+
+  private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) {
+    Map<String, T> latestForKey = new HashMap<>();
+    CompletableFuture<PhaseOneResult> loopPromise = new CompletableFuture<>();
+
+    reader.getLastMessageIdAsync()
+        .thenAccept(lastMessageId -> {
+          log.info("Commencing phase one of compaction for {}, reading to {}",
+              reader.getTopic(), lastMessageId);
+          // Each entry is processed as a whole, discard the batchIndex part 
deliberately.
+          MessageIdImpl lastImpl = (MessageIdImpl) lastMessageId;
+          MessageIdImpl lastEntryMessageId = new 
MessageIdImpl(lastImpl.getLedgerId(),
+              lastImpl.getEntryId(),
+              lastImpl.getPartitionIndex());
+          phaseOneLoop(reader, Optional.empty(), Optional.empty(), 
lastEntryMessageId, latestForKey,
+              loopPromise);
+        }).exceptionally(ex -> {
+          loopPromise.completeExceptionally(ex);
+          return null;
+        });
+
+    return loopPromise;
+  }
+
+  private void phaseOneLoop(RawReader reader,
+      Optional<MessageId> firstMessageId,
+      Optional<MessageId> toMessageId,
+      MessageId lastMessageId,
+      Map<String, T> latestForKey,
+      CompletableFuture<PhaseOneResult> loopPromise) {
+    if (loopPromise.isDone()) {
+      return;
+    }
+    CompletableFuture<RawMessage> future = reader.readNextAsync();
+    FutureUtil.addTimeoutHandling(future,
+        phaseOneLoopReadTimeout, scheduler,
+        () -> FutureUtil.createTimeoutException("Timeout", getClass(), 
"phaseOneLoop(...)"));
+
+    future.thenAcceptAsync(m -> {
+      try (m) {
+        MessageId id = m.getMessageId();
+        boolean deletedMessage = false;
+        mxBean.addCompactionReadOp(reader.getTopic(), 
m.getHeadersAndPayload().readableBytes());
+        MessageMetadata metadata = 
Commands.parseMessageMetadata(m.getHeadersAndPayload());
+        if (Markers.isServerOnlyMarker(metadata)) {
+          mxBean.addCompactionRemovedEvent(reader.getTopic());
+          deletedMessage = true;
+        } else if (RawBatchConverter.isReadableBatch(metadata)) {
+          deletedMessage = compactBatchMessage(reader.getTopic(), 
latestForKey, m, metadata, id);
+        } else {
+          deletedMessage = compactMessage(reader.getTopic(), latestForKey, m, 
id);
+        }
+        MessageId first = firstMessageId.orElse(deletedMessage ? null : id);
+        MessageId to = deletedMessage ? toMessageId.orElse(null) : id;
+        if (id.compareTo(lastMessageId) == 0) {
+          loopPromise.complete(new PhaseOneResult(first == null ? id : first, 
to == null ? id : to,
+              lastMessageId, latestForKey));
+        } else {
+          phaseOneLoop(reader,
+              Optional.ofNullable(first),
+              Optional.ofNullable(to),
+              lastMessageId,
+              latestForKey, loopPromise);
+        }
+      }
+    }, scheduler).exceptionally(ex -> {
+      loopPromise.completeExceptionally(ex);
+      return null;
+    });
+  }
+
+  private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, 
MessageId to,
+      MessageId lastReadId,
+      Map<String, MessageId> latestForKey, BookKeeper bk) {
+    Map<String, byte[]> metadata =
+        LedgerMetadataUtils.buildMetadataForCompactedLedger(reader.getTopic(), 
to.toByteArray());
+    return createLedger(bk, metadata).thenCompose((ledger) -> {
+      log.info(
+          "Commencing phase two of compaction for {}, from {} to {}, 
compacting {} keys to ledger {}",
+          reader.getTopic(), from, to, latestForKey.size(), ledger.getId());
+      return phaseTwoSeekThenLoop(reader, from, to, lastReadId, latestForKey, 
bk, ledger);
+    });
+  }
+
+  private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, 
MessageId from,
+      MessageId to,
+      MessageId lastReadId, Map<String, MessageId> latestForKey, BookKeeper bk,
+      LedgerHandle ledger) {
+    CompletableFuture<Long> promise = new CompletableFuture<>();
+
+    reader.seekAsync(from).thenCompose((v) -> {
+          Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
+          CompletableFuture<Void> loopPromise = new CompletableFuture<>();
+          phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, 
loopPromise, MessageId.earliest);
+          return loopPromise;
+        }).thenCompose((v) -> closeLedger(ledger))
+        .thenCompose((v) -> reader.acknowledgeCumulativeAsync(lastReadId,
+            Map.of(COMPACTED_TOPIC_LEDGER_PROPERTY, ledger.getId())))
+        .whenComplete((res, exception) -> {
+          if (exception != null) {
+            deleteLedger(bk, ledger).whenComplete((res2, exception2) -> {
+              if (exception2 != null) {
+                log.warn("Cleanup of ledger {} for failed", ledger, 
exception2);
+              }
+              // complete with original exception
+              promise.completeExceptionally(exception);
+            });
+          } else {
+            promise.complete(ledger.getId());
+          }
+        });
+    return promise;
+  }
+
+  private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, 
MessageId> latestForKey,
+      LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise,
+      MessageId lastCompactedMessageId) {
+    if (promise.isDone()) {
+      return;
+    }
+    reader.readNextAsync().thenAcceptAsync(m -> {
+      if (promise.isDone()) {
+        m.close();
+        return;
+      }
+
+      if (m.getMessageId().compareTo(lastCompactedMessageId) <= 0) {
+        m.close();
+        phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise, 
lastCompactedMessageId);
+        return;
+      }
+
+      try {
+        MessageId id = m.getMessageId();
+        Optional<RawMessage> messageToAdd = Optional.empty();
+        mxBean.addCompactionReadOp(reader.getTopic(), 
m.getHeadersAndPayload().readableBytes());
+        MessageMetadata metadata = 
Commands.parseMessageMetadata(m.getHeadersAndPayload());
+        if (Markers.isServerOnlyMarker(metadata)) {
+          messageToAdd = Optional.empty();
+        } else if (RawBatchConverter.isReadableBatch(metadata)) {
+          try {
+            messageToAdd = rebatchMessage(reader.getTopic(),
+                m, (key, subid) -> subid.equals(latestForKey.get(key)),
+                topicCompactionRetainNullKey);
+          } catch (IOException ioe) {
+            log.info("Error decoding batch for message {}. Whole batch will be 
included in output",
+                id, ioe);
+            messageToAdd = Optional.of(m);
+          }
+        } else {
+          Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
+          MessageId msg;
+          if (keyAndSize == null) {
+            messageToAdd = topicCompactionRetainNullKey ? Optional.of(m) : 
Optional.empty();
+          } else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
+              && msg.equals(id)) { // consider message only if present into 
latestForKey map
+            if (keyAndSize.getRight() <= 0) {
+              promise.completeExceptionally(new IllegalArgumentException(
+                  "Compaction phase found empty record from sorted key-map"));
+            }
+            messageToAdd = Optional.of(m);
+          }
+        }
+
+        if (messageToAdd.isPresent()) {
+          RawMessage message = messageToAdd.get();
+          try {
+            outstanding.acquire();
+            CompletableFuture<Void> addFuture = addToCompactedLedger(lh, 
message, reader.getTopic())
+                .whenComplete((res, exception2) -> {
+                  outstanding.release();
+                  if (exception2 != null) {
+                    promise.completeExceptionally(exception2);
+                  }
+                });
+            if (to.equals(id)) {
+              // make sure all inflight writes have finished
+              outstanding.acquire(MAX_OUTSTANDING);
+              addFuture.whenComplete((res, exception2) -> {
+                if (exception2 == null) {
+                  promise.complete(null);
+                }
+              });
+              return;
+            }
+          } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            promise.completeExceptionally(ie);
+          } finally {
+            if (message != m) {
+              message.close();
+            }
+          }
+        } else if (to.equals(id)) {
+          // Reached to last-id and phase-one found it deleted-message while 
iterating on ledger so,
+          // not present under latestForKey. Complete the compaction.
+          try {
+            // make sure all inflight writes have finished
+            outstanding.acquire(MAX_OUTSTANDING);
+            promise.complete(null);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            promise.completeExceptionally(e);
+          }
+          return;
+        }
+        phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise, 
m.getMessageId());
+      } finally {
+        m.close();
+      }
+    }, scheduler).exceptionally(ex -> {
+      promise.completeExceptionally(ex);
+      return null;
+    });
+  }
+
+  protected CompletableFuture<LedgerHandle> createLedger(BookKeeper bk,
+      Map<String, byte[]> metadata) {
+    CompletableFuture<LedgerHandle> bkf = new CompletableFuture<>();
+
+    try {
+      bk.asyncCreateLedger(conf.getManagedLedgerDefaultEnsembleSize(),
+          conf.getManagedLedgerDefaultWriteQuorum(),
+          conf.getManagedLedgerDefaultAckQuorum(),
+          Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+          Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD,
+          (rc, ledger, ctx) -> {
+            if (rc != BKException.Code.OK) {
+              bkf.completeExceptionally(BKException.create(rc));
+            } else {
+              bkf.complete(ledger);
+            }
+          }, null, metadata);
+    } catch (Throwable t) {
+      log.error("Encountered unexpected error when creating compaction 
ledger", t);
+      return FutureUtil.failedFuture(t);
+    }
+    return bkf;
+  }
+
+  protected CompletableFuture<Void> deleteLedger(BookKeeper bk, LedgerHandle 
lh) {
+    CompletableFuture<Void> bkf = new CompletableFuture<>();
+    try {
+      bk.asyncDeleteLedger(lh.getId(),
+          (rc, ctx) -> {
+            if (rc != BKException.Code.OK) {
+              bkf.completeExceptionally(BKException.create(rc));
+            } else {
+              bkf.complete(null);
+            }
+          }, null);
+    } catch (Throwable t) {
+      return FutureUtil.failedFuture(t);
+    }
+    return bkf;
+  }
+
+  protected CompletableFuture<Void> closeLedger(LedgerHandle lh) {
+    CompletableFuture<Void> bkf = new CompletableFuture<>();
+    try {
+      lh.asyncClose((rc, ledger, ctx) -> {
+        if (rc != BKException.Code.OK) {
+          bkf.completeExceptionally(BKException.create(rc));
+        } else {
+          bkf.complete(null);
+        }
+      }, null);
+    } catch (Throwable t) {
+      return FutureUtil.failedFuture(t);
+    }
+    return bkf;
+  }
+
+  private CompletableFuture<Void> addToCompactedLedger(LedgerHandle lh, 
RawMessage m,
+      String topic) {
+    CompletableFuture<Void> bkf = new CompletableFuture<>();
+    ByteBuf serialized = m.serialize();
+    try {
+      mxBean.addCompactionWriteOp(topic, 
m.getHeadersAndPayload().readableBytes());
+      long start = System.nanoTime();
+      lh.asyncAddEntry(serialized,
+          (rc, ledger, eid, ctx) -> {
+            mxBean.addCompactionLatencyOp(topic, System.nanoTime() - start, 
TimeUnit.NANOSECONDS);
+            if (rc != BKException.Code.OK) {
+              bkf.completeExceptionally(BKException.create(rc));
+            } else {
+              bkf.complete(null);
+            }
+          }, null);
+    } catch (Throwable t) {
+      return FutureUtil.failedFuture(t);
+    }
+    return bkf;
+  }
+
+  protected Pair<String, Integer> extractKeyAndSize(RawMessage m) {
+    ByteBuf headersAndPayload = m.getHeadersAndPayload();
+    MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+    if (msgMetadata.hasPartitionKey()) {
+      int size = headersAndPayload.readableBytes();
+      if (msgMetadata.hasUncompressedSize()) {
+        size = msgMetadata.getUncompressedSize();
+      }
+      return Pair.of(msgMetadata.getPartitionKey(), size);
+    } else {
+      return null;
+    }
+  }
+
+
+  protected Optional<RawMessage> rebatchMessage(String topic, RawMessage msg,
+      BiPredicate<String, MessageId> filter,
+      boolean retainNullKey)
+      throws IOException {
+    if (log.isDebugEnabled()) {
+      log.debug("Rebatching message {} for topic {}", msg.getMessageId(), 
topic);
+    }
+    return RawBatchConverter.rebatchMessage(msg, filter, retainNullKey);
+  }
+
+  protected static class PhaseOneResult<T> {
+
+    final MessageId from;
+    final MessageId to; // last undeleted messageId
+    final MessageId lastReadId; // last read messageId
+    final Map<String, T> latestForKey;
+
+    PhaseOneResult(MessageId from, MessageId to, MessageId lastReadId,
+        Map<String, T> latestForKey) {
+      this.from = from;
+      this.to = to;
+      this.lastReadId = lastReadId;
+      this.latestForKey = latestForKey;
+    }
+  }
+
+  public long getPhaseOneLoopReadTimeoutInSeconds() {
+    return phaseOneLoopReadTimeout.getSeconds();
+  }
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index fe77db33692..ba68e07cf5b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -172,7 +172,7 @@ public class CompactorTool {
         @Cleanup
         PulsarClient pulsar = createClient(brokerConfig);
 
-        Compactor compactor = new TwoPhaseCompactor(brokerConfig, pulsar, bk, 
scheduler);
+        Compactor compactor = new PublishingOrderCompactor(brokerConfig, 
pulsar, bk, scheduler);
         long ledgerId = compactor.compact(arguments.topic).get();
         log.info("Compaction of topic {} complete. Compacted to ledger {}", 
arguments.topic, ledgerId);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeCompactionServiceFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeCompactionServiceFactory.java
new file mode 100644
index 00000000000..383c7b1aeed
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeCompactionServiceFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+
+public class EventTimeCompactionServiceFactory extends 
PulsarCompactionServiceFactory {
+
+  @Override
+  protected Compactor newCompactor() throws PulsarServerException {
+    PulsarService pulsarService = getPulsarService();
+    return new EventTimeOrderCompactor(pulsarService.getConfiguration(),
+        pulsarService.getClient(), pulsarService.getBookKeeperClient(),
+        pulsarService.getCompactorExecutor());
+  }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java
new file mode 100644
index 00000000000..2cd19ba15d6
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.RawBatchConverter;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventTimeOrderCompactor extends 
AbstractTwoPhaseCompactor<Pair<MessageId, Long>> {
+
+  private static final Logger log = 
LoggerFactory.getLogger(EventTimeOrderCompactor.class);
+
+  public EventTimeOrderCompactor(ServiceConfiguration conf,
+      PulsarClient pulsar,
+      BookKeeper bk,
+      ScheduledExecutorService scheduler) {
+    super(conf, pulsar, bk, scheduler);
+  }
+
+  @Override
+  protected Map<String, MessageId> toLatestMessageIdForKey(
+      Map<String, Pair<MessageId, Long>> latestForKey) {
+    return latestForKey.entrySet()
+        .stream()
+        .collect(Collectors.toMap(
+            Map.Entry::getKey,
+            entry -> entry.getValue().getLeft()));
+  }
+
+  @Override
+  protected boolean compactMessage(String topic, Map<String, Pair<MessageId, 
Long>> latestForKey,
+      RawMessage m, MessageId id) {
+    boolean deletedMessage = false;
+    boolean replaceMessage = false;
+    MessageCompactionData mcd = extractMessageCompactionData(m);
+
+    if (mcd != null) {
+      boolean newer = Optional.ofNullable(latestForKey.get(mcd.key()))
+          .map(Pair::getRight)
+          .map(latestEventTime -> mcd.eventTime() != null
+              && mcd.eventTime() >= latestEventTime).orElse(true);
+      if (newer) {
+        if (mcd.payloadSize() > 0) {
+          Pair<MessageId, Long> old = latestForKey.put(mcd.key(),
+              new ImmutablePair<>(mcd.messageId(), mcd.eventTime()));
+          replaceMessage = old != null;
+        } else {
+          deletedMessage = true;
+          latestForKey.remove(mcd.key());
+        }
+      }
+    } else {
+      if (!topicCompactionRetainNullKey) {
+        deletedMessage = true;
+      }
+    }
+    if (replaceMessage || deletedMessage) {
+      mxBean.addCompactionRemovedEvent(topic);
+    }
+    return deletedMessage;
+  }
+
+  @Override
+  protected boolean compactBatchMessage(String topic, Map<String, 
Pair<MessageId, Long>> latestForKey, RawMessage m,
+      MessageMetadata metadata, MessageId id) {
+    boolean deletedMessage = false;
+    try {
+      int numMessagesInBatch = metadata.getNumMessagesInBatch();
+      int deleteCnt = 0;
+
+      for (MessageCompactionData mcd : 
extractMessageCompactionDataFromBatch(m)) {
+        if (mcd.key() == null) {
+          if (!topicCompactionRetainNullKey) {
+            // record delete null-key message event
+            deleteCnt++;
+            mxBean.addCompactionRemovedEvent(topic);
+          }
+          continue;
+        }
+
+        boolean newer = Optional.ofNullable(latestForKey.get(mcd.key()))
+            .map(Pair::getRight)
+            .map(latestEventTime -> mcd.eventTime() != null
+                && mcd.eventTime() > latestEventTime).orElse(true);
+        if (newer) {
+          if (mcd.payloadSize() > 0) {
+            Pair<MessageId, Long> old = latestForKey.put(mcd.key(),
+                new ImmutablePair<>(mcd.messageId(), mcd.eventTime()));
+            if (old != null) {
+              mxBean.addCompactionRemovedEvent(topic);
+            }
+          } else {
+            latestForKey.remove(mcd.key());
+            deleteCnt++;
+            mxBean.addCompactionRemovedEvent(topic);
+          }
+        }
+      }
+
+      if (deleteCnt == numMessagesInBatch) {
+        deletedMessage = true;
+      }
+    } catch (IOException ioe) {
+      log.info("Error decoding batch for message {}. Whole batch will be 
included in output",
+          id, ioe);
+    }
+    return deletedMessage;
+  }
+
+  protected MessageCompactionData extractMessageCompactionData(RawMessage m) {
+    ByteBuf headersAndPayload = m.getHeadersAndPayload();
+    MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+    if (msgMetadata.hasPartitionKey()) {
+      int size = headersAndPayload.readableBytes();
+      if (msgMetadata.hasUncompressedSize()) {
+        size = msgMetadata.getUncompressedSize();
+      }
+      return new MessageCompactionData(m.getMessageId(), 
msgMetadata.getPartitionKey(),
+          size, msgMetadata.getEventTime());
+    } else {
+      return null;
+    }
+  }
+
+  private List<MessageCompactionData> 
extractMessageCompactionDataFromBatch(RawMessage msg)
+      throws IOException {
+    return RawBatchConverter.extractMessageCompactionData(msg);
+  }
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/MessageCompactionData.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/MessageCompactionData.java
new file mode 100644
index 00000000000..03800273a80
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/MessageCompactionData.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import org.apache.pulsar.client.api.MessageId;
+
+public record MessageCompactionData (MessageId messageId, String key, Integer 
payloadSize, Long eventTime) {}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PublishingOrderCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PublishingOrderCompactor.java
new file mode 100644
index 00000000000..a825c0782fb
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PublishingOrderCompactor.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.commons.lang3.tuple.ImmutableTriple;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.RawBatchConverter;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PublishingOrderCompactor extends 
AbstractTwoPhaseCompactor<MessageId> {
+
+    private static final Logger log = 
LoggerFactory.getLogger(PublishingOrderCompactor.class);
+
+    public PublishingOrderCompactor(ServiceConfiguration conf,
+        PulsarClient pulsar,
+        BookKeeper bk,
+        ScheduledExecutorService scheduler) {
+        super(conf, pulsar, bk, scheduler);
+    }
+
+    @Override
+    protected Map<String, MessageId> toLatestMessageIdForKey(Map<String, 
MessageId> latestForKey) {
+        return latestForKey;
+    }
+
+    @Override
+    protected boolean compactMessage(String topic, Map<String, MessageId> 
latestForKey,
+        RawMessage m, MessageId id) {
+        boolean deletedMessage = false;
+        boolean replaceMessage = false;
+        Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
+        if (keyAndSize != null) {
+            if (keyAndSize.getRight() > 0) {
+                MessageId old = latestForKey.put(keyAndSize.getLeft(), id);
+                replaceMessage = old != null;
+            } else {
+                deletedMessage = true;
+                latestForKey.remove(keyAndSize.getLeft());
+            }
+        } else {
+            if (!topicCompactionRetainNullKey) {
+                deletedMessage = true;
+            }
+        }
+        if (replaceMessage || deletedMessage) {
+            mxBean.addCompactionRemovedEvent(topic);
+        }
+        return deletedMessage;
+    }
+
+    @Override
+    protected boolean compactBatchMessage(String topic, Map<String, MessageId> 
latestForKey,
+        RawMessage m, MessageMetadata metadata, MessageId id) {
+        boolean deletedMessage = false;
+        try {
+            int numMessagesInBatch = metadata.getNumMessagesInBatch();
+            int deleteCnt = 0;
+            for (ImmutableTriple<MessageId, String, Integer> e : 
extractIdsAndKeysAndSizeFromBatch(
+                m)) {
+                if (e != null) {
+                    if (e.getMiddle() == null) {
+                        if (!topicCompactionRetainNullKey) {
+                            // record delete null-key message event
+                            deleteCnt++;
+                            mxBean.addCompactionRemovedEvent(topic);
+                        }
+                        continue;
+                    }
+                    if (e.getRight() > 0) {
+                        MessageId old = latestForKey.put(e.getMiddle(), 
e.getLeft());
+                        if (old != null) {
+                            mxBean.addCompactionRemovedEvent(topic);
+                        }
+                    } else {
+                        latestForKey.remove(e.getMiddle());
+                        deleteCnt++;
+                        mxBean.addCompactionRemovedEvent(topic);
+                    }
+                }
+            }
+            if (deleteCnt == numMessagesInBatch) {
+                deletedMessage = true;
+            }
+        } catch (IOException ioe) {
+            log.info(
+                "Error decoding batch for message {}. Whole batch will be 
included in output",
+                id, ioe);
+        }
+
+        return deletedMessage;
+    }
+
+    protected List<ImmutableTriple<MessageId, String, Integer>> 
extractIdsAndKeysAndSizeFromBatch(
+        RawMessage msg)
+        throws IOException {
+        return RawBatchConverter.extractIdsAndKeysAndSize(msg);
+    }
+
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
index 424733ad581..90132461b4c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
@@ -54,7 +54,7 @@ public class PulsarCompactionServiceFactory implements 
CompactionServiceFactory
     }
 
     protected Compactor newCompactor() throws PulsarServerException {
-        return new TwoPhaseCompactor(pulsarService.getConfiguration(),
+        return new PublishingOrderCompactor(pulsarService.getConfiguration(),
                 pulsarService.getClient(), pulsarService.getBookKeeperClient(),
                 pulsarService.getCompactorExecutor());
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
index fefa2ee959c..1b54092d9aa 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
@@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory;
  * <p>As the first pass caches the entire message(not just offset) for each 
key into a map,
  * this compaction could be memory intensive if the message payload is large.
  */
-public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor {
+public class StrategicTwoPhaseCompactor extends PublishingOrderCompactor {
     private static final Logger log = 
LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class);
     private static final int MAX_OUTSTANDING = 500;
     private static final int MAX_READER_RECONNECT_WAITING_TIME_IN_MILLIS = 20 
* 1000;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
deleted file mode 100644
index 647c34a94ad..00000000000
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ /dev/null
@@ -1,470 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.compaction;
-
-import io.netty.buffer.ByteBuf;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiPredicate;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
-import org.apache.commons.lang3.tuple.ImmutableTriple;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.RawMessage;
-import org.apache.pulsar.client.api.RawReader;
-import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.RawBatchConverter;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.protocol.Markers;
-import org.apache.pulsar.common.util.FutureUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Compaction will go through the topic in two passes. The first pass
- * selects latest offset for each key in the topic. Then the second pass
- * writes these values to a ledger.
- *
- * <p>The two passes are required to avoid holding the payloads of each of
- * the latest values in memory, as the payload can be many orders of
- * magnitude larger than a message id.
-*/
-public class TwoPhaseCompactor extends Compactor {
-    private static final Logger log = 
LoggerFactory.getLogger(TwoPhaseCompactor.class);
-    private static final int MAX_OUTSTANDING = 500;
-    private final Duration phaseOneLoopReadTimeout;
-    private final boolean topicCompactionRetainNullKey;
-
-    public TwoPhaseCompactor(ServiceConfiguration conf,
-                             PulsarClient pulsar,
-                             BookKeeper bk,
-                             ScheduledExecutorService scheduler) {
-        super(conf, pulsar, bk, scheduler);
-        phaseOneLoopReadTimeout = 
Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
-        topicCompactionRetainNullKey = conf.isTopicCompactionRetainNullKey();
-    }
-
-    @Override
-    protected CompletableFuture<Long> doCompaction(RawReader reader, 
BookKeeper bk) {
-        return reader.hasMessageAvailableAsync()
-                .thenCompose(available -> {
-                    if (available) {
-                        return phaseOne(reader).thenCompose(
-                                (r) -> phaseTwo(reader, r.from, r.to, 
r.lastReadId, r.latestForKey, bk));
-                    } else {
-                        log.info("Skip compaction of the empty topic {}", 
reader.getTopic());
-                        return CompletableFuture.completedFuture(-1L);
-                    }
-                });
-    }
-
-    private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) {
-        Map<String, MessageId> latestForKey = new HashMap<>();
-        CompletableFuture<PhaseOneResult> loopPromise = new 
CompletableFuture<>();
-
-        reader.getLastMessageIdAsync()
-                .thenAccept(lastMessageId -> {
-                    log.info("Commencing phase one of compaction for {}, 
reading to {}",
-                            reader.getTopic(), lastMessageId);
-                    // Each entry is processed as a whole, discard the 
batchIndex part deliberately.
-                    MessageIdImpl lastImpl = (MessageIdImpl) lastMessageId;
-                    MessageIdImpl lastEntryMessageId = new 
MessageIdImpl(lastImpl.getLedgerId(), lastImpl.getEntryId(),
-                            lastImpl.getPartitionIndex());
-                    phaseOneLoop(reader, Optional.empty(), Optional.empty(), 
lastEntryMessageId, latestForKey,
-                            loopPromise);
-                }).exceptionally(ex -> {
-                    loopPromise.completeExceptionally(ex);
-                    return null;
-                });
-
-        return loopPromise;
-    }
-
-    private void phaseOneLoop(RawReader reader,
-                              Optional<MessageId> firstMessageId,
-                              Optional<MessageId> toMessageId,
-                              MessageId lastMessageId,
-                              Map<String, MessageId> latestForKey,
-                              CompletableFuture<PhaseOneResult> loopPromise) {
-        if (loopPromise.isDone()) {
-            return;
-        }
-        CompletableFuture<RawMessage> future = reader.readNextAsync();
-        FutureUtil.addTimeoutHandling(future,
-                phaseOneLoopReadTimeout, scheduler,
-                () -> FutureUtil.createTimeoutException("Timeout", getClass(), 
"phaseOneLoop(...)"));
-
-        future.thenAcceptAsync(m -> {
-            try (m) {
-                MessageId id = m.getMessageId();
-                boolean deletedMessage = false;
-                boolean replaceMessage = false;
-                mxBean.addCompactionReadOp(reader.getTopic(), 
m.getHeadersAndPayload().readableBytes());
-                MessageMetadata metadata = 
Commands.parseMessageMetadata(m.getHeadersAndPayload());
-                if (Markers.isServerOnlyMarker(metadata)) {
-                    mxBean.addCompactionRemovedEvent(reader.getTopic());
-                    deletedMessage = true;
-                } else if (RawBatchConverter.isReadableBatch(metadata)) {
-                    try {
-                        int numMessagesInBatch = 
metadata.getNumMessagesInBatch();
-                        int deleteCnt = 0;
-                        for (ImmutableTriple<MessageId, String, Integer> e : 
extractIdsAndKeysAndSizeFromBatch(m)) {
-                            if (e != null) {
-                                if (e.getMiddle() == null) {
-                                    if (!topicCompactionRetainNullKey) {
-                                        // record delete null-key message event
-                                        deleteCnt++;
-                                        
mxBean.addCompactionRemovedEvent(reader.getTopic());
-                                    }
-                                    continue;
-                                }
-                                if (e.getRight() > 0) {
-                                    MessageId old = 
latestForKey.put(e.getMiddle(), e.getLeft());
-                                    if (old != null) {
-                                        
mxBean.addCompactionRemovedEvent(reader.getTopic());
-                                    }
-                                } else {
-                                    latestForKey.remove(e.getMiddle());
-                                    deleteCnt++;
-                                    
mxBean.addCompactionRemovedEvent(reader.getTopic());
-                                }
-                            }
-                        }
-                        if (deleteCnt == numMessagesInBatch) {
-                            deletedMessage = true;
-                        }
-                    } catch (IOException ioe) {
-                        log.info("Error decoding batch for message {}. Whole 
batch will be included in output",
-                                id, ioe);
-                    }
-                } else {
-                    Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
-                    if (keyAndSize != null) {
-                        if (keyAndSize.getRight() > 0) {
-                            MessageId old = 
latestForKey.put(keyAndSize.getLeft(), id);
-                            replaceMessage = old != null;
-                        } else {
-                            deletedMessage = true;
-                            latestForKey.remove(keyAndSize.getLeft());
-                        }
-                    } else {
-                        if (!topicCompactionRetainNullKey) {
-                            deletedMessage = true;
-                        }
-                    }
-                    if (replaceMessage || deletedMessage) {
-                        mxBean.addCompactionRemovedEvent(reader.getTopic());
-                    }
-                }
-                MessageId first = firstMessageId.orElse(deletedMessage ? null 
: id);
-                MessageId to = deletedMessage ? toMessageId.orElse(null) : id;
-                if (id.compareTo(lastMessageId) == 0) {
-                    loopPromise.complete(new PhaseOneResult(first == null ? id 
: first, to == null ? id : to,
-                            lastMessageId, latestForKey));
-                } else {
-                    phaseOneLoop(reader,
-                            Optional.ofNullable(first),
-                            Optional.ofNullable(to),
-                            lastMessageId,
-                            latestForKey, loopPromise);
-                }
-            }
-        }, scheduler).exceptionally(ex -> {
-            loopPromise.completeExceptionally(ex);
-            return null;
-        });
-    }
-
-    private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, 
MessageId to, MessageId lastReadId,
-            Map<String, MessageId> latestForKey, BookKeeper bk) {
-        Map<String, byte[]> metadata =
-                
LedgerMetadataUtils.buildMetadataForCompactedLedger(reader.getTopic(), 
to.toByteArray());
-        return createLedger(bk, metadata).thenCompose((ledger) -> {
-            log.info("Commencing phase two of compaction for {}, from {} to 
{}, compacting {} keys to ledger {}",
-                    reader.getTopic(), from, to, latestForKey.size(), 
ledger.getId());
-            return phaseTwoSeekThenLoop(reader, from, to, lastReadId, 
latestForKey, bk, ledger);
-        });
-    }
-
-    private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, 
MessageId from, MessageId to,
-            MessageId lastReadId, Map<String, MessageId> latestForKey, 
BookKeeper bk, LedgerHandle ledger) {
-        CompletableFuture<Long> promise = new CompletableFuture<>();
-
-        reader.seekAsync(from).thenCompose((v) -> {
-            Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
-            CompletableFuture<Void> loopPromise = new CompletableFuture<>();
-            phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, 
loopPromise, MessageId.earliest);
-            return loopPromise;
-        }).thenCompose((v) -> closeLedger(ledger))
-                .thenCompose((v) -> 
reader.acknowledgeCumulativeAsync(lastReadId,
-                        Map.of(COMPACTED_TOPIC_LEDGER_PROPERTY, 
ledger.getId())))
-                .whenComplete((res, exception) -> {
-                    if (exception != null) {
-                        deleteLedger(bk, ledger).whenComplete((res2, 
exception2) -> {
-                            if (exception2 != null) {
-                                log.warn("Cleanup of ledger {} for failed", 
ledger, exception2);
-                            }
-                            // complete with original exception
-                            promise.completeExceptionally(exception);
-                        });
-                    } else {
-                        promise.complete(ledger.getId());
-                    }
-                });
-        return promise;
-    }
-
-    private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, 
MessageId> latestForKey,
-                              LedgerHandle lh, Semaphore outstanding, 
CompletableFuture<Void> promise,
-                              MessageId lastCompactedMessageId) {
-        if (promise.isDone()) {
-            return;
-        }
-        reader.readNextAsync().thenAcceptAsync(m -> {
-            if (promise.isDone()) {
-                m.close();
-                return;
-            }
-
-            if (m.getMessageId().compareTo(lastCompactedMessageId) <= 0) {
-                m.close();
-                phaseTwoLoop(reader, to, latestForKey, lh, outstanding, 
promise, lastCompactedMessageId);
-                return;
-            }
-
-            try {
-                MessageId id = m.getMessageId();
-                Optional<RawMessage> messageToAdd = Optional.empty();
-                mxBean.addCompactionReadOp(reader.getTopic(), 
m.getHeadersAndPayload().readableBytes());
-                MessageMetadata metadata = 
Commands.parseMessageMetadata(m.getHeadersAndPayload());
-                if (Markers.isServerOnlyMarker(metadata)) {
-                    messageToAdd = Optional.empty();
-                } else if (RawBatchConverter.isReadableBatch(metadata)) {
-                    try {
-                        messageToAdd = rebatchMessage(reader.getTopic(),
-                                m, (key, subid) -> 
subid.equals(latestForKey.get(key)), topicCompactionRetainNullKey);
-                    } catch (IOException ioe) {
-                        log.info("Error decoding batch for message {}. Whole 
batch will be included in output",
-                                id, ioe);
-                        messageToAdd = Optional.of(m);
-                    }
-                } else {
-                    Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
-                    MessageId msg;
-                    if (keyAndSize == null) {
-                        messageToAdd = topicCompactionRetainNullKey ? 
Optional.of(m) : Optional.empty();
-                    } else if ((msg = latestForKey.get(keyAndSize.getLeft())) 
!= null
-                            && msg.equals(id)) { // consider message only if 
present into latestForKey map
-                        if (keyAndSize.getRight() <= 0) {
-                            promise.completeExceptionally(new 
IllegalArgumentException(
-                                    "Compaction phase found empty record from 
sorted key-map"));
-                        }
-                        messageToAdd = Optional.of(m);
-                    }
-                }
-
-                if (messageToAdd.isPresent()) {
-                    RawMessage message = messageToAdd.get();
-                    try {
-                        outstanding.acquire();
-                        CompletableFuture<Void> addFuture = 
addToCompactedLedger(lh, message, reader.getTopic())
-                                .whenComplete((res, exception2) -> {
-                                    outstanding.release();
-                                    if (exception2 != null) {
-                                        
promise.completeExceptionally(exception2);
-                                    }
-                                });
-                        if (to.equals(id)) {
-                            // make sure all inflight writes have finished
-                            outstanding.acquire(MAX_OUTSTANDING);
-                            addFuture.whenComplete((res, exception2) -> {
-                                if (exception2 == null) {
-                                    promise.complete(null);
-                                }
-                            });
-                            return;
-                        }
-                    } catch (InterruptedException ie) {
-                        Thread.currentThread().interrupt();
-                        promise.completeExceptionally(ie);
-                    } finally {
-                        if (message != m) {
-                            message.close();
-                        }
-                    }
-                } else if (to.equals(id)) {
-                    // Reached to last-id and phase-one found it 
deleted-message while iterating on ledger so,
-                    // not present under latestForKey. Complete the compaction.
-                    try {
-                        // make sure all inflight writes have finished
-                        outstanding.acquire(MAX_OUTSTANDING);
-                        promise.complete(null);
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                        promise.completeExceptionally(e);
-                    }
-                    return;
-                }
-                phaseTwoLoop(reader, to, latestForKey, lh, outstanding, 
promise, m.getMessageId());
-            } finally {
-                m.close();
-            }
-        }, scheduler).exceptionally(ex -> {
-            promise.completeExceptionally(ex);
-            return null;
-        });
-    }
-
-    protected CompletableFuture<LedgerHandle> createLedger(BookKeeper bk, 
Map<String, byte[]> metadata) {
-        CompletableFuture<LedgerHandle> bkf = new CompletableFuture<>();
-
-        try {
-            bk.asyncCreateLedger(conf.getManagedLedgerDefaultEnsembleSize(),
-                    conf.getManagedLedgerDefaultWriteQuorum(),
-                    conf.getManagedLedgerDefaultAckQuorum(),
-                    Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
-                    Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD,
-                    (rc, ledger, ctx) -> {
-                        if (rc != BKException.Code.OK) {
-                            bkf.completeExceptionally(BKException.create(rc));
-                        } else {
-                            bkf.complete(ledger);
-                        }
-                    }, null, metadata);
-        } catch (Throwable t) {
-            log.error("Encountered unexpected error when creating compaction 
ledger", t);
-            return FutureUtil.failedFuture(t);
-        }
-        return bkf;
-    }
-
-    protected CompletableFuture<Void> deleteLedger(BookKeeper bk, LedgerHandle 
lh) {
-        CompletableFuture<Void> bkf = new CompletableFuture<>();
-        try {
-            bk.asyncDeleteLedger(lh.getId(),
-                    (rc, ctx) -> {
-                        if (rc != BKException.Code.OK) {
-                            bkf.completeExceptionally(BKException.create(rc));
-                        } else {
-                            bkf.complete(null);
-                        }
-                    }, null);
-        } catch (Throwable t) {
-            return FutureUtil.failedFuture(t);
-        }
-        return bkf;
-    }
-
-    protected CompletableFuture<Void> closeLedger(LedgerHandle lh) {
-        CompletableFuture<Void> bkf = new CompletableFuture<>();
-        try {
-            lh.asyncClose((rc, ledger, ctx) -> {
-                if (rc != BKException.Code.OK) {
-                    bkf.completeExceptionally(BKException.create(rc));
-                } else {
-                    bkf.complete(null);
-                }
-            }, null);
-        } catch (Throwable t) {
-            return FutureUtil.failedFuture(t);
-        }
-        return bkf;
-    }
-
-    private CompletableFuture<Void> addToCompactedLedger(LedgerHandle lh, 
RawMessage m, String topic) {
-        CompletableFuture<Void> bkf = new CompletableFuture<>();
-        ByteBuf serialized = m.serialize();
-        try {
-            mxBean.addCompactionWriteOp(topic, 
m.getHeadersAndPayload().readableBytes());
-            long start = System.nanoTime();
-            lh.asyncAddEntry(serialized,
-                    (rc, ledger, eid, ctx) -> {
-                        mxBean.addCompactionLatencyOp(topic, System.nanoTime() 
- start, TimeUnit.NANOSECONDS);
-                        if (rc != BKException.Code.OK) {
-                            bkf.completeExceptionally(BKException.create(rc));
-                        } else {
-                            bkf.complete(null);
-                        }
-                    }, null);
-        } catch (Throwable t) {
-            return FutureUtil.failedFuture(t);
-        }
-        return bkf;
-    }
-
-    protected Pair<String, Integer> extractKeyAndSize(RawMessage m) {
-        ByteBuf headersAndPayload = m.getHeadersAndPayload();
-        MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
-        if (msgMetadata.hasPartitionKey()) {
-            int size = headersAndPayload.readableBytes();
-            if (msgMetadata.hasUncompressedSize()) {
-                size = msgMetadata.getUncompressedSize();
-            }
-            return Pair.of(msgMetadata.getPartitionKey(), size);
-        } else {
-            return null;
-        }
-    }
-
-    protected List<ImmutableTriple<MessageId, String, Integer>> 
extractIdsAndKeysAndSizeFromBatch(RawMessage msg)
-            throws IOException {
-        return RawBatchConverter.extractIdsAndKeysAndSize(msg);
-    }
-
-    protected Optional<RawMessage> rebatchMessage(String topic, RawMessage 
msg, BiPredicate<String, MessageId> filter,
-                                                  boolean retainNullKey)
-            throws IOException {
-        if (log.isDebugEnabled()) {
-            log.debug("Rebatching message {} for topic {}", 
msg.getMessageId(), topic);
-        }
-        return RawBatchConverter.rebatchMessage(msg, filter, retainNullKey);
-    }
-
-    private static class PhaseOneResult {
-        final MessageId from;
-        final MessageId to; // last undeleted messageId
-        final MessageId lastReadId; // last read messageId
-        final Map<String, MessageId> latestForKey;
-
-        PhaseOneResult(MessageId from, MessageId to, MessageId lastReadId, 
Map<String, MessageId> latestForKey) {
-            this.from = from;
-            this.to = to;
-            this.lastReadId = lastReadId;
-            this.latestForKey = latestForKey;
-        }
-    }
-
-    public long getPhaseOneLoopReadTimeoutInSeconds() {
-        return phaseOneLoopReadTimeout.getSeconds();
-    }
-}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
index ac1ba6bc814..45dc30d21df 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
@@ -62,7 +62,7 @@ import org.testng.annotations.Test;
 public class CompactionRetentionTest extends MockedPulsarServiceBaseTest {
     protected ScheduledExecutorService compactionScheduler;
     protected BookKeeper bk;
-    private TwoPhaseCompactor compactor;
+    private PublishingOrderCompactor compactor;
 
     @BeforeMethod
     @Override
@@ -79,7 +79,7 @@ public class CompactionRetentionTest extends 
MockedPulsarServiceBaseTest {
         compactionScheduler = Executors.newSingleThreadScheduledExecutor(
                 new 
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
         bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, 
Optional.empty(), null).get();
-        compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor = new PublishingOrderCompactor(conf, pulsarClient, bk, 
compactionScheduler);
     }
 
     @AfterMethod(alwaysRun = true)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 0cf32859e3d..19f42a7e057 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -109,7 +109,7 @@ import org.testng.annotations.Test;
 public class CompactionTest extends MockedPulsarServiceBaseTest {
     protected ScheduledExecutorService compactionScheduler;
     protected BookKeeper bk;
-    private TwoPhaseCompactor compactor;
+    private PublishingOrderCompactor compactor;
 
     @BeforeMethod
     @Override
@@ -124,7 +124,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         compactionScheduler = Executors.newSingleThreadScheduledExecutor(
                 new 
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
         bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, 
Optional.empty(), null).get();
-        compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor = new PublishingOrderCompactor(conf, pulsarClient, bk, 
compactionScheduler);
     }
 
     @AfterMethod(alwaysRun = true)
@@ -147,7 +147,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         return compactor.compact(topic).get();
     }
 
-    protected TwoPhaseCompactor getCompactor() {
+    protected PublishingOrderCompactor getCompactor() {
         return compactor;
     }
 
@@ -656,7 +656,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
     public void testKeyLessMessagesPassThrough(boolean retainNullKey) throws 
Exception {
         conf.setTopicCompactionRetainNullKey(retainNullKey);
         restartBroker();
-        FieldUtils.writeDeclaredField(compactor, 
"topicCompactionRetainNullKey", retainNullKey, true);
+        FieldUtils.writeField(compactor, "topicCompactionRetainNullKey", 
retainNullKey, true);
 
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 1c09dc0d643..5cf7d33200d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -101,7 +101,7 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
                 new 
ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
         bk = pulsar.getBookKeeperClientFactory().create(
                 this.conf, null, null, Optional.empty(), null).get();
-        compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor = new PublishingOrderCompactor(conf, pulsarClient, bk, 
compactionScheduler);
     }
 
 
@@ -127,7 +127,7 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
         return compactor;
     }
 
-    private List<String> compactAndVerify(String topic, Map<String, byte[]> 
expected, boolean checkMetrics)
+    protected List<String> compactAndVerify(String topic, Map<String, byte[]> 
expected, boolean checkMetrics)
             throws Exception {
 
         long compactedLedgerId = compact(topic);
@@ -361,7 +361,7 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
         PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
         ConnectionPool connectionPool = mock(ConnectionPool.class);
         when(mockClient.getCnxPool()).thenReturn(connectionPool);
-        TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration, 
mockClient,
+        PublishingOrderCompactor compactor = new 
PublishingOrderCompactor(configuration, mockClient,
                 Mockito.mock(BookKeeper.class), compactionScheduler);
         Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(), 
60);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/EventTimeOrderCompactorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/EventTimeOrderCompactorTest.java
new file mode 100644
index 00000000000..8fba0983123
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/EventTimeOrderCompactorTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricDoubleSumValue;
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
+import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-compaction")
+public class EventTimeOrderCompactorTest extends CompactorTest {
+
+  private EventTimeOrderCompactor compactor;
+
+  @BeforeMethod
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    compactor = new EventTimeOrderCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+  }
+
+  @Override
+  protected long compact(String topic) throws ExecutionException, 
InterruptedException {
+    return compactor.compact(topic).get();
+  }
+
+  @Override
+  protected Compactor getCompactor() {
+    return compactor;
+  }
+
+  @Test
+  public void testCompactedOutByEventTime() throws Exception {
+    String topicName = 
BrokerTestUtil.newUniqueName("persistent://my-property/use/my-ns/testCompactedOutByEventTime");
+    this.restartBroker();
+
+    @Cleanup
+    Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+        .enableBatching(true).topic(topicName).batchingMaxMessages(3).create();
+
+    producer.newMessage().key("K1").value("V1").eventTime(1L).sendAsync();
+    producer.newMessage().key("K2").value("V2").eventTime(1L).sendAsync();
+    producer.newMessage().key("K2").value(null).eventTime(2L).sendAsync();
+    producer.flush();
+
+    admin.topics().triggerCompaction(topicName);
+
+    Awaitility.await().untilAsserted(() -> {
+      Assert.assertEquals(admin.topics().compactionStatus(topicName).status,
+          LongRunningProcessStatus.Status.SUCCESS);
+    });
+
+    var attributes = Attributes.builder()
+        .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent")
+        .put(OpenTelemetryAttributes.PULSAR_TENANT, "my-property")
+        .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "my-property/use/my-ns")
+        .put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName)
+        .build();
+    var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+    assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.COMPACTION_REMOVED_COUNTER, attributes, 1);
+    assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.COMPACTION_OPERATION_COUNTER, Attributes.builder()
+            .putAll(attributes)
+            .put(OpenTelemetryAttributes.PULSAR_COMPACTION_STATUS, "success")
+            .build(),
+        1);
+    assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.COMPACTION_OPERATION_COUNTER, Attributes.builder()
+            .putAll(attributes)
+            .put(OpenTelemetryAttributes.PULSAR_COMPACTION_STATUS, "failure")
+            .build(),
+        0);
+    assertMetricDoubleSumValue(metrics, 
OpenTelemetryTopicStats.COMPACTION_DURATION_SECONDS, attributes,
+        actual -> assertThat(actual).isPositive());
+    assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.COMPACTION_BYTES_IN_COUNTER, attributes,
+        actual -> assertThat(actual).isPositive());
+    assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.COMPACTION_BYTES_OUT_COUNTER, attributes,
+        actual -> assertThat(actual).isPositive());
+    assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.COMPACTION_ENTRIES_COUNTER, attributes, 1);
+    assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.COMPACTION_BYTES_COUNTER, attributes,
+        actual -> assertThat(actual).isPositive());
+
+    producer.newMessage().key("K1").eventTime(2L).value("V1-2").sendAsync();
+    producer.flush();
+
+    admin.topics().triggerCompaction(topicName);
+
+    Awaitility.await().untilAsserted(() -> {
+      Assert.assertEquals(admin.topics().compactionStatus(topicName).status,
+          LongRunningProcessStatus.Status.SUCCESS);
+    });
+
+    @Cleanup
+    Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+        .subscriptionName("reader-test")
+        .topic(topicName)
+        .readCompacted(true)
+        .startMessageId(MessageId.earliest)
+        .create();
+    while (reader.hasMessageAvailable()) {
+      Message<String> message = reader.readNext(3, TimeUnit.SECONDS);
+      Assert.assertEquals(message.getEventTime(), 2L);
+    }
+  }
+
+  @Test
+  public void testCompactWithEventTimeAddCompact() throws Exception {
+    String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+    @Cleanup
+    Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
+        .enableBatching(false)
+        .messageRoutingMode(MessageRoutingMode.SinglePartition)
+        .create();
+
+    Map<String, byte[]> expected = new HashMap<>();
+
+    producer.newMessage()
+        .key("a")
+        .eventTime(1L)
+        .value("A_1".getBytes())
+        .send();
+    producer.newMessage()
+        .key("b")
+        .eventTime(1L)
+        .value("B_1".getBytes())
+        .send();
+    producer.newMessage()
+        .key("a")
+        .eventTime(2L)
+        .value("A_2".getBytes())
+        .send();
+    expected.put("a", "A_2".getBytes());
+    expected.put("b", "B_1".getBytes());
+
+    compactAndVerify(topic, new HashMap<>(expected), false);
+
+    producer.newMessage()
+        .key("b")
+        .eventTime(2L)
+        .value("B_2".getBytes())
+        .send();
+    expected.put("b", "B_2".getBytes());
+
+    compactAndVerify(topic, expected, false);
+  }
+
+  @Override
+  @Test
+  public void testPhaseOneLoopTimeConfiguration() {
+    ServiceConfiguration configuration = new ServiceConfiguration();
+    configuration.setBrokerServiceCompactionPhaseOneLoopTimeInSeconds(60);
+    PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+    ConnectionPool connectionPool = mock(ConnectionPool.class);
+    when(mockClient.getCnxPool()).thenReturn(connectionPool);
+    EventTimeOrderCompactor compactor = new 
EventTimeOrderCompactor(configuration, mockClient,
+        Mockito.mock(BookKeeper.class), compactionScheduler);
+    Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(), 60);
+  }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
index 54563431052..d1ff46cbc02 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
@@ -74,7 +74,7 @@ public class StrategicCompactionTest extends CompactionTest {
     }
 
     @Override
-    protected TwoPhaseCompactor getCompactor() {
+    protected PublishingOrderCompactor getCompactor() {
         return compactor;
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
index 2aa09309d39..9f33479ce4c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
@@ -53,7 +53,7 @@ public class TopicCompactionServiceTest extends 
MockedPulsarServiceBaseTest {
 
     protected ScheduledExecutorService compactionScheduler;
     protected BookKeeper bk;
-    private TwoPhaseCompactor compactor;
+    private PublishingOrderCompactor compactor;
 
     @BeforeMethod
     @Override
@@ -73,7 +73,7 @@ public class TopicCompactionServiceTest extends 
MockedPulsarServiceBaseTest {
                 new 
ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
         bk = pulsar.getBookKeeperClientFactory().create(
                 this.conf, null, null, Optional.empty(), null).get();
-        compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor = new PublishingOrderCompactor(conf, pulsarClient, bk, 
compactionScheduler);
     }
 
     @AfterMethod(alwaysRun = true)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index ab4925bfeb8..74c2a93b84e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -62,7 +62,7 @@ import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.pulsar.compaction.TwoPhaseCompactor;
+import org.apache.pulsar.compaction.PublishingOrderCompactor;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.utils.FunctionCommon;
@@ -259,7 +259,7 @@ public class PulsarFunctionE2ETest extends 
AbstractPulsarE2ETest {
         @Cleanup("shutdownNow")
         ScheduledExecutorService compactionScheduler = 
Executors.newSingleThreadScheduledExecutor(
                 new 
ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
-        TwoPhaseCompactor twoPhaseCompactor = new TwoPhaseCompactor(config,
+        PublishingOrderCompactor twoPhaseCompactor = new 
PublishingOrderCompactor(config,
                 pulsarClient, pulsar.getBookKeeperClient(), 
compactionScheduler);
         twoPhaseCompactor.compact(sourceTopic).get();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 7edc87bb996..be2b377a9cf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -49,7 +49,7 @@ import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.pulsar.compaction.TwoPhaseCompactor;
+import org.apache.pulsar.compaction.PublishingOrderCompactor;
 import org.apache.pulsar.functions.LocalRunner;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.utils.FunctionCommon;
@@ -107,7 +107,7 @@ public class PulsarSinkE2ETest extends 
AbstractPulsarE2ETest {
         @Cleanup("shutdownNow")
         ScheduledExecutorService compactionScheduler = 
Executors.newSingleThreadScheduledExecutor(
                 new 
ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
-        TwoPhaseCompactor twoPhaseCompactor = new TwoPhaseCompactor(config,
+        PublishingOrderCompactor twoPhaseCompactor = new 
PublishingOrderCompactor(config,
                 pulsarClient, pulsar.getBookKeeperClient(), 
compactionScheduler);
         twoPhaseCompactor.compact(sourceTopic).get();
 

Reply via email to