This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 346f831 Compaction allows keyless messages to pass through (#1514) 346f831 is described below commit 346f8312484cc4389d96f2aeefc0be6393c4fcc2 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Fri Apr 6 23:35:56 2018 +0200 Compaction allows keyless messages to pass through (#1514) If a message has no key it's impossible to know if a later message supercedes it, so in this case, it should be simply passed through compaction (i.e. it should appear when reading from a compacted topic). --- .../pulsar/client/impl/RawBatchConverter.java | 7 ++- .../pulsar/compaction/TwoPhaseCompactor.java | 14 ++++- .../apache/pulsar/compaction/CompactionTest.java | 70 ++++++++++++++++++++++ 3 files changed, 86 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java index fa62982..4e628bc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java @@ -105,12 +105,15 @@ public class RawBatchConverter { ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadataBuilder, 0, batchSize); - String key = singleMessageMetadataBuilder.getPartitionKey(); MessageId id = new BatchMessageIdImpl(msg.getMessageIdData().getLedgerId(), msg.getMessageIdData().getEntryId(), msg.getMessageIdData().getPartition(), i); - if (filter.test(key, id)) { + if (!singleMessageMetadataBuilder.hasPartitionKey()) { + messagesRetained++; + Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder, + singleMessagePayload, batchBuffer); + } else if (filter.test(singleMessageMetadataBuilder.getPartitionKey(), id)) { messagesRetained++; Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder, singleMessagePayload, batchBuffer); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index f12870b..fbad47e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -123,7 +123,9 @@ public class TwoPhaseCompactor extends Compactor { } } else { String key = extractKey(m); - latestForKey.put(key, id); + if (key != null) { + latestForKey.put(key, id); + } } if (id.compareTo(lastMessageId) == 0) { @@ -213,7 +215,9 @@ public class TwoPhaseCompactor extends Compactor { } } else { String key = extractKey(m); - if (latestForKey.get(key).equals(id)) { + if (key == null) { // pass through messages without a key + messageToAdd = Optional.of(m); + } else if (latestForKey.get(key).equals(id)) { messageToAdd = Optional.of(m); } else { m.close(); @@ -306,7 +310,11 @@ public class TwoPhaseCompactor extends Compactor { private static String extractKey(RawMessage m) { ByteBuf headersAndPayload = m.getHeadersAndPayload(); MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); - return msgMetadata.getPartitionKey(); + if (msgMetadata.hasPartitionKey()) { + return msgMetadata.getPartitionKey(); + } else { + return null; + } } private static class PhaseOneResult { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index af39e12..a0f0f97 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -442,4 +442,74 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { Assert.assertEquals(new String(message.getData()), "my-message-4"); } } + + @Test + public void testKeyLessMessagesPassThrough() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + // subscribe before sending anything, so that we get all messages + pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe().close(); + + try (Producer producerNormal = pulsarClient.newProducer().topic(topic).create(); + Producer producerBatch = pulsarClient.newProducer().topic(topic).maxPendingMessages(3) + .enableBatching(true).batchingMaxMessages(3) + .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) { + producerNormal.sendAsync(MessageBuilder.create() + .setContent("my-message-1".getBytes()).build()).get(); + + producerBatch.sendAsync(MessageBuilder.create() + .setContent("my-message-2".getBytes()).build()); + producerBatch.sendAsync(MessageBuilder.create() + .setKey("key1") + .setContent("my-message-3".getBytes()).build()); + producerBatch.sendAsync(MessageBuilder.create() + .setKey("key1") + .setContent("my-message-4".getBytes()).build()).get(); + + producerBatch.sendAsync(MessageBuilder.create() + .setKey("key2") + .setContent("my-message-5".getBytes()).build()); + producerBatch.sendAsync(MessageBuilder.create() + .setKey("key2") + .setContent("my-message-6".getBytes()).build()); + producerBatch.sendAsync(MessageBuilder.create() + .setContent("my-message-7".getBytes()).build()).get(); + + producerNormal.sendAsync(MessageBuilder.create() + .setContent("my-message-8".getBytes()).build()).get(); + } + + // compact the topic + Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic).get(); + + try (Consumer consumer = pulsarClient.newConsumer().topic(topic) + .subscriptionName("sub1").readCompacted(true).subscribe()){ + Message message1 = consumer.receive(); + Assert.assertFalse(message1.hasKey()); + Assert.assertEquals(new String(message1.getData()), "my-message-1"); + + Message message2 = consumer.receive(); + Assert.assertFalse(message2.hasKey()); + Assert.assertEquals(new String(message2.getData()), "my-message-2"); + + Message message3 = consumer.receive(); + Assert.assertEquals(message3.getKey(), "key1"); + Assert.assertEquals(new String(message3.getData()), "my-message-4"); + + Message message4 = consumer.receive(); + Assert.assertEquals(message4.getKey(), "key2"); + Assert.assertEquals(new String(message4.getData()), "my-message-6"); + + Message message5 = consumer.receive(); + Assert.assertFalse(message5.hasKey()); + Assert.assertEquals(new String(message5.getData()), "my-message-7"); + + Message message6 = consumer.receive(); + Assert.assertFalse(message6.hasKey()); + Assert.assertEquals(new String(message6.getData()), "my-message-8"); + } + } + } -- To stop receiving notification emails like this one, please contact mme...@apache.org.