This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new ba855c3 Add batching support to compaction (#1427) ba855c3 is described below commit ba855c36e786c0a38d2caee00e1dbaa707b4467d Author: Ivan Kelly <iv...@apache.org> AuthorDate: Wed Mar 28 02:05:30 2018 +0200 Add batching support to compaction (#1427) With this patch compaction can select to keep only some of the submessages from a batch. --- .../pulsar/compaction/TwoPhaseCompactor.java | 97 +++++++++++++------- .../apache/pulsar/compaction/CompactionTest.java | 100 +++++++++++++++++++++ 2 files changed, 163 insertions(+), 34 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 3e2d259..f12870b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -21,8 +21,11 @@ package org.apache.pulsar.compaction; import com.google.common.collect.ImmutableMap; import io.netty.buffer.ByteBuf; +import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -43,6 +46,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.RawReader; import org.apache.pulsar.client.api.RawMessage; +import org.apache.pulsar.client.impl.RawBatchConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,7 +87,8 @@ public class TwoPhaseCompactor extends Compactor { if (exception != null) { loopPromise.completeExceptionally(exception); } else { - log.info("Commencing phase one of compaction for {}, reading to {}", reader, lastMessageId); + log.info("Commencing phase one of compaction for {}, reading to {}", + reader.getTopic(), lastMessageId); phaseOneLoop(reader, Optional.empty(), lastMessageId, latestForKey, loopPromise); } }); @@ -108,8 +113,18 @@ public class TwoPhaseCompactor extends Compactor { return; } MessageId id = m.getMessageId(); - String key = extractKey(m); - latestForKey.put(key, id); + if (RawBatchConverter.isBatch(m)) { + try { + RawBatchConverter.extractIdsAndKeys(m) + .forEach(e -> latestForKey.put(e.getRight(), e.getLeft())); + } catch (IOException ioe) { + log.info("Error decoding batch for message {}. Whole batch will be included in output", + id, ioe); + } + } else { + String key = extractKey(m); + latestForKey.put(key, id); + } if (id.compareTo(lastMessageId) == 0) { loopPromise.complete(new PhaseOneResult(firstMessageId.orElse(id), @@ -140,7 +155,7 @@ public class TwoPhaseCompactor extends Compactor { return createLedger(bk).thenCompose((ledger) -> { log.info("Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}", - reader, from, to, latestForKey.size(), ledger.getId()); + reader.getTopic(), from, to, latestForKey.size(), ledger.getId()); return phaseTwoSeekThenLoop(reader, from, to, latestForKey, bk, ledger); }); } @@ -179,41 +194,55 @@ public class TwoPhaseCompactor extends Compactor { LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise) { reader.readNextAsync().whenCompleteAsync( (m, exception) -> { - try { - if (exception != null) { - promise.completeExceptionally(exception); - return; - } else if (promise.isDone()) { - return; + if (exception != null) { + promise.completeExceptionally(exception); + return; + } else if (promise.isDone()) { + return; + } + MessageId id = m.getMessageId(); + Optional<RawMessage> messageToAdd = Optional.empty(); + if (RawBatchConverter.isBatch(m)) { + try { + messageToAdd = RawBatchConverter.rebatchMessage( + m, (key, subid) -> latestForKey.get(key).equals(subid)); + } catch (IOException ioe) { + log.info("Error decoding batch for message {}. Whole batch will be included in output", + id, ioe); + messageToAdd = Optional.of(m); } - MessageId id = m.getMessageId(); + } else { String key = extractKey(m); - if (latestForKey.get(key).equals(id)) { - - outstanding.acquire(); - CompletableFuture<Void> addFuture = addToCompactedLedger(lh, m) - .whenComplete((res, exception2) -> { - outstanding.release(); - if (exception2 != null) { - promise.completeExceptionally(exception2); - } - }); - if (to.equals(id)) { - addFuture.whenComplete((res, exception2) -> { - if (exception2 == null) { - promise.complete(null); - } - }); - } + messageToAdd = Optional.of(m); + } else { + m.close(); } - phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - promise.completeExceptionally(ie); - } finally { - m.close(); } + + messageToAdd.ifPresent((toAdd) -> { + try { + outstanding.acquire(); + CompletableFuture<Void> addFuture = addToCompactedLedger(lh, toAdd) + .whenComplete((res, exception2) -> { + outstanding.release(); + if (exception2 != null) { + promise.completeExceptionally(exception2); + } + }); + if (to.equals(id)) { + addFuture.whenComplete((res, exception2) -> { + if (exception2 == null) { + promise.complete(null); + } + }); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + promise.completeExceptionally(ie); + } + }); + phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise); }, scheduler); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 7a22bb1..b2c2bbe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -36,8 +36,10 @@ import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PropertyAdmin; + import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -342,4 +344,102 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { Assert.assertEquals(message2.getMessageId(), messages.get(2).getMessageId()); } } + + @Test + public void testBatchMessageIdsDontChange() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + // subscribe before sending anything, so that we get all messages + pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe().close(); + + try (Producer producer = pulsarClient.newProducer().topic(topic).maxPendingMessages(3) + .enableBatching(true).batchingMaxMessages(3) + .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) { + producer.sendAsync(MessageBuilder.create() + .setKey("key1") + .setContent("my-message-1".getBytes()).build()); + producer.sendAsync(MessageBuilder.create() + .setKey("key2") + .setContent("my-message-2".getBytes()).build()); + producer.sendAsync(MessageBuilder.create() + .setKey("key2") + .setContent("my-message-3".getBytes()).build()).get(); + } + + // Read messages before compaction to get ids + List<Message> messages = new ArrayList<>(); + try (Consumer consumer = pulsarClient.newConsumer().topic(topic) + .subscriptionName("sub1").readCompacted(true).subscribe()) { + messages.add(consumer.receive()); + messages.add(consumer.receive()); + messages.add(consumer.receive()); + } + + // Ensure all messages are in same batch + Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getLedgerId(), + ((BatchMessageIdImpl)messages.get(1).getMessageId()).getLedgerId()); + Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getLedgerId(), + ((BatchMessageIdImpl)messages.get(2).getMessageId()).getLedgerId()); + Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getEntryId(), + ((BatchMessageIdImpl)messages.get(1).getMessageId()).getEntryId()); + Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getEntryId(), + ((BatchMessageIdImpl)messages.get(2).getMessageId()).getEntryId()); + + // compact the topic + Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic).get(); + + // Check that messages after compaction have same ids + try (Consumer consumer = pulsarClient.newConsumer().topic(topic) + .subscriptionName("sub1").readCompacted(true).subscribe()){ + Message message1 = consumer.receive(); + Assert.assertEquals(message1.getKey(), "key1"); + Assert.assertEquals(new String(message1.getData()), "my-message-1"); + Assert.assertEquals(message1.getMessageId(), messages.get(0).getMessageId()); + + Message message2 = consumer.receive(); + Assert.assertEquals(message2.getKey(), "key2"); + Assert.assertEquals(new String(message2.getData()), "my-message-3"); + Assert.assertEquals(message2.getMessageId(), messages.get(2).getMessageId()); + } + } + + @Test + public void testWholeBatchCompactedOut() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + // subscribe before sending anything, so that we get all messages + pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe().close(); + + try (Producer producerNormal = pulsarClient.newProducer().topic(topic).create(); + Producer producerBatch = pulsarClient.newProducer().topic(topic).maxPendingMessages(3) + .enableBatching(true).batchingMaxMessages(3) + .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) { + producerBatch.sendAsync(MessageBuilder.create() + .setKey("key1") + .setContent("my-message-1".getBytes()).build()); + producerBatch.sendAsync(MessageBuilder.create() + .setKey("key1") + .setContent("my-message-2".getBytes()).build()); + producerBatch.sendAsync(MessageBuilder.create() + .setKey("key1") + .setContent("my-message-3".getBytes()).build()).get(); + producerNormal.sendAsync(MessageBuilder.create() + .setKey("key1") + .setContent("my-message-4".getBytes()).build()).get(); + } + + // compact the topic + Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic).get(); + + try (Consumer consumer = pulsarClient.newConsumer().topic(topic) + .subscriptionName("sub1").readCompacted(true).subscribe()){ + Message message = consumer.receive(); + Assert.assertEquals(message.getKey(), "key1"); + Assert.assertEquals(new String(message.getData()), "my-message-4"); + } + } } -- To stop receiving notification emails like this one, please contact mme...@apache.org.