codelipenghui commented on code in PR #18195:
URL: https://github.com/apache/pulsar/pull/18195#discussion_r1030366755


##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import io.netty.buffer.ByteBuf;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.CompactionReaderImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.RawBatchMessageContainerImpl;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Compaction will go through the topic in two passes. The first pass
+ * selects valid message(defined in the TopicCompactionStrategy.isValid())
+ * for each key in the topic. Then, the second pass writes these values
+ * to a ledger.
+ *
+ * <p>As the first pass caches the entire message(not just offset) for each 
key into a map,
+ * this compaction could be memory intensive if the message payload is large.
+ */
+public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor {
+    private static final Logger log = 
LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class);
+    private static final int MAX_OUTSTANDING = 500;
+    private final Duration phaseOneLoopReadTimeout;
+    private final RawBatchMessageContainerImpl batchMessageContainer;
+
+    public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
+                                      PulsarClient pulsar,
+                                      BookKeeper bk,
+                                      ScheduledExecutorService scheduler,
+                                      int maxNumMessagesInBatch) {
+        super(conf, pulsar, bk, scheduler);
+        batchMessageContainer = new 
RawBatchMessageContainerImpl(maxNumMessagesInBatch);
+        phaseOneLoopReadTimeout = 
Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
+    }
+
+    public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
+                                      PulsarClient pulsar,
+                                      BookKeeper bk,
+                                      ScheduledExecutorService scheduler) {
+        this(conf, pulsar, bk, scheduler, -1);
+    }
+
+    public CompletableFuture<Long> compact(String topic) {
+        throw new UnsupportedOperationException();
+    }
+
+
+    public <T> CompletableFuture<Long> compact(String topic,
+                                               TopicCompactionStrategy<T> 
strategy) {
+        return compact(topic, strategy, null);
+    }
+
+    public <T> CompletableFuture<Long> compact(String topic,
+                                               TopicCompactionStrategy<T> 
strategy,
+                                               CryptoKeyReader 
cryptoKeyReader) {
+        CompletableFuture<Consumer<T>> consumerFuture = new 
CompletableFuture<>();
+        if (cryptoKeyReader != null) {
+            batchMessageContainer.setCryptoKeyReader(cryptoKeyReader);
+        }
+        CompactionReaderImpl reader = CompactionReaderImpl.create(
+                (PulsarClientImpl) pulsar, strategy.getSchema(), topic, 
consumerFuture, cryptoKeyReader);
+
+        return consumerFuture.thenComposeAsync(__ -> 
compactAndCloseReader(reader, strategy), scheduler);
+    }
+
+    <T> CompletableFuture<Long> doCompaction(Reader<T> reader, 
TopicCompactionStrategy strategy) {
+
+        if (!(reader instanceof CompactionReaderImpl<T>)) {
+            return CompletableFuture.failedFuture(
+                    new IllegalStateException("reader has to be 
DelayedAckReaderImpl"));
+        }
+        return reader.hasMessageAvailableAsync()

Review Comment:
   Sorry, I missed this part when reviewing the proposal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to