This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 346f831 Compaction allows keyless messages to pass through (#1514)
346f831 is described below
commit 346f8312484cc4389d96f2aeefc0be6393c4fcc2
Author: Ivan Kelly <[email protected]>
AuthorDate: Fri Apr 6 23:35:56 2018 +0200
Compaction allows keyless messages to pass through (#1514)
If a message has no key it's impossible to know if a later message
supercedes it, so in this case, it should be simply passed through
compaction (i.e. it should appear when reading from a compacted
topic).
---
.../pulsar/client/impl/RawBatchConverter.java | 7 ++-
.../pulsar/compaction/TwoPhaseCompactor.java | 14 ++++-
.../apache/pulsar/compaction/CompactionTest.java | 70 ++++++++++++++++++++++
3 files changed, 86 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index fa62982..4e628bc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -105,12 +105,15 @@ public class RawBatchConverter {
ByteBuf singleMessagePayload =
Commands.deSerializeSingleMessageInBatch(payload,
singleMessageMetadataBuilder,
0, batchSize);
- String key = singleMessageMetadataBuilder.getPartitionKey();
MessageId id = new
BatchMessageIdImpl(msg.getMessageIdData().getLedgerId(),
msg.getMessageIdData().getEntryId(),
msg.getMessageIdData().getPartition(),
i);
- if (filter.test(key, id)) {
+ if (!singleMessageMetadataBuilder.hasPartitionKey()) {
+ messagesRetained++;
+
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder,
+
singleMessagePayload, batchBuffer);
+ } else if
(filter.test(singleMessageMetadataBuilder.getPartitionKey(), id)) {
messagesRetained++;
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder,
singleMessagePayload, batchBuffer);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index f12870b..fbad47e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -123,7 +123,9 @@ public class TwoPhaseCompactor extends Compactor {
}
} else {
String key = extractKey(m);
- latestForKey.put(key, id);
+ if (key != null) {
+ latestForKey.put(key, id);
+ }
}
if (id.compareTo(lastMessageId) == 0) {
@@ -213,7 +215,9 @@ public class TwoPhaseCompactor extends Compactor {
}
} else {
String key = extractKey(m);
- if (latestForKey.get(key).equals(id)) {
+ if (key == null) { // pass through messages without a
key
+ messageToAdd = Optional.of(m);
+ } else if (latestForKey.get(key).equals(id)) {
messageToAdd = Optional.of(m);
} else {
m.close();
@@ -306,7 +310,11 @@ public class TwoPhaseCompactor extends Compactor {
private static String extractKey(RawMessage m) {
ByteBuf headersAndPayload = m.getHeadersAndPayload();
MessageMetadata msgMetadata =
Commands.parseMessageMetadata(headersAndPayload);
- return msgMetadata.getPartitionKey();
+ if (msgMetadata.hasPartitionKey()) {
+ return msgMetadata.getPartitionKey();
+ } else {
+ return null;
+ }
}
private static class PhaseOneResult {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index af39e12..a0f0f97 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -442,4 +442,74 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
Assert.assertEquals(new String(message.getData()), "my-message-4");
}
}
+
+ @Test
+ public void testKeyLessMessagesPassThrough() throws Exception {
+ String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+ // subscribe before sending anything, so that we get all messages
+ pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+ .readCompacted(true).subscribe().close();
+
+ try (Producer producerNormal =
pulsarClient.newProducer().topic(topic).create();
+ Producer producerBatch =
pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
+ .enableBatching(true).batchingMaxMessages(3)
+ .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) {
+ producerNormal.sendAsync(MessageBuilder.create()
+
.setContent("my-message-1".getBytes()).build()).get();
+
+ producerBatch.sendAsync(MessageBuilder.create()
+
.setContent("my-message-2".getBytes()).build());
+ producerBatch.sendAsync(MessageBuilder.create()
+ .setKey("key1")
+
.setContent("my-message-3".getBytes()).build());
+ producerBatch.sendAsync(MessageBuilder.create()
+ .setKey("key1")
+
.setContent("my-message-4".getBytes()).build()).get();
+
+ producerBatch.sendAsync(MessageBuilder.create()
+ .setKey("key2")
+
.setContent("my-message-5".getBytes()).build());
+ producerBatch.sendAsync(MessageBuilder.create()
+ .setKey("key2")
+
.setContent("my-message-6".getBytes()).build());
+ producerBatch.sendAsync(MessageBuilder.create()
+
.setContent("my-message-7".getBytes()).build()).get();
+
+ producerNormal.sendAsync(MessageBuilder.create()
+
.setContent("my-message-8".getBytes()).build()).get();
+ }
+
+ // compact the topic
+ Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor.compact(topic).get();
+
+ try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
+ .subscriptionName("sub1").readCompacted(true).subscribe()){
+ Message message1 = consumer.receive();
+ Assert.assertFalse(message1.hasKey());
+ Assert.assertEquals(new String(message1.getData()),
"my-message-1");
+
+ Message message2 = consumer.receive();
+ Assert.assertFalse(message2.hasKey());
+ Assert.assertEquals(new String(message2.getData()),
"my-message-2");
+
+ Message message3 = consumer.receive();
+ Assert.assertEquals(message3.getKey(), "key1");
+ Assert.assertEquals(new String(message3.getData()),
"my-message-4");
+
+ Message message4 = consumer.receive();
+ Assert.assertEquals(message4.getKey(), "key2");
+ Assert.assertEquals(new String(message4.getData()),
"my-message-6");
+
+ Message message5 = consumer.receive();
+ Assert.assertFalse(message5.hasKey());
+ Assert.assertEquals(new String(message5.getData()),
"my-message-7");
+
+ Message message6 = consumer.receive();
+ Assert.assertFalse(message6.hasKey());
+ Assert.assertEquals(new String(message6.getData()),
"my-message-8");
+ }
+ }
+
}
--
To stop receiving notification emails like this one, please contact
[email protected].