This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ba855c3 Add batching support to compaction (#1427)
ba855c3 is described below
commit ba855c36e786c0a38d2caee00e1dbaa707b4467d
Author: Ivan Kelly <[email protected]>
AuthorDate: Wed Mar 28 02:05:30 2018 +0200
Add batching support to compaction (#1427)
With this patch compaction can select to keep only some of the
submessages from a batch.
---
.../pulsar/compaction/TwoPhaseCompactor.java | 97 +++++++++++++-------
.../apache/pulsar/compaction/CompactionTest.java | 100 +++++++++++++++++++++
2 files changed, 163 insertions(+), 34 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 3e2d259..f12870b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -21,8 +21,11 @@ package org.apache.pulsar.compaction;
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -43,6 +46,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.RawBatchConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,7 +87,8 @@ public class TwoPhaseCompactor extends Compactor {
if (exception != null) {
loopPromise.completeExceptionally(exception);
} else {
- log.info("Commencing phase one of compaction for {},
reading to {}", reader, lastMessageId);
+ log.info("Commencing phase one of compaction for {},
reading to {}",
+ reader.getTopic(), lastMessageId);
phaseOneLoop(reader, Optional.empty(), lastMessageId,
latestForKey, loopPromise);
}
});
@@ -108,8 +113,18 @@ public class TwoPhaseCompactor extends Compactor {
return;
}
MessageId id = m.getMessageId();
- String key = extractKey(m);
- latestForKey.put(key, id);
+ if (RawBatchConverter.isBatch(m)) {
+ try {
+ RawBatchConverter.extractIdsAndKeys(m)
+ .forEach(e ->
latestForKey.put(e.getRight(), e.getLeft()));
+ } catch (IOException ioe) {
+ log.info("Error decoding batch for message {}.
Whole batch will be included in output",
+ id, ioe);
+ }
+ } else {
+ String key = extractKey(m);
+ latestForKey.put(key, id);
+ }
if (id.compareTo(lastMessageId) == 0) {
loopPromise.complete(new
PhaseOneResult(firstMessageId.orElse(id),
@@ -140,7 +155,7 @@ public class TwoPhaseCompactor extends Compactor {
return createLedger(bk).thenCompose((ledger) -> {
log.info("Commencing phase two of compaction for {}, from {}
to {}, compacting {} keys to ledger {}",
- reader, from, to, latestForKey.size(),
ledger.getId());
+ reader.getTopic(), from, to, latestForKey.size(),
ledger.getId());
return phaseTwoSeekThenLoop(reader, from, to, latestForKey,
bk, ledger);
});
}
@@ -179,41 +194,55 @@ public class TwoPhaseCompactor extends Compactor {
LedgerHandle lh, Semaphore outstanding,
CompletableFuture<Void> promise) {
reader.readNextAsync().whenCompleteAsync(
(m, exception) -> {
- try {
- if (exception != null) {
- promise.completeExceptionally(exception);
- return;
- } else if (promise.isDone()) {
- return;
+ if (exception != null) {
+ promise.completeExceptionally(exception);
+ return;
+ } else if (promise.isDone()) {
+ return;
+ }
+ MessageId id = m.getMessageId();
+ Optional<RawMessage> messageToAdd = Optional.empty();
+ if (RawBatchConverter.isBatch(m)) {
+ try {
+ messageToAdd = RawBatchConverter.rebatchMessage(
+ m, (key, subid) ->
latestForKey.get(key).equals(subid));
+ } catch (IOException ioe) {
+ log.info("Error decoding batch for message {}.
Whole batch will be included in output",
+ id, ioe);
+ messageToAdd = Optional.of(m);
}
- MessageId id = m.getMessageId();
+ } else {
String key = extractKey(m);
-
if (latestForKey.get(key).equals(id)) {
-
- outstanding.acquire();
- CompletableFuture<Void> addFuture =
addToCompactedLedger(lh, m)
- .whenComplete((res, exception2) -> {
- outstanding.release();
- if (exception2 != null) {
-
promise.completeExceptionally(exception2);
- }
- });
- if (to.equals(id)) {
- addFuture.whenComplete((res, exception2) -> {
- if (exception2 == null) {
- promise.complete(null);
- }
- });
- }
+ messageToAdd = Optional.of(m);
+ } else {
+ m.close();
}
- phaseTwoLoop(reader, to, latestForKey, lh,
outstanding, promise);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- promise.completeExceptionally(ie);
- } finally {
- m.close();
}
+
+ messageToAdd.ifPresent((toAdd) -> {
+ try {
+ outstanding.acquire();
+ CompletableFuture<Void> addFuture =
addToCompactedLedger(lh, toAdd)
+ .whenComplete((res, exception2) -> {
+ outstanding.release();
+ if (exception2 != null) {
+
promise.completeExceptionally(exception2);
+ }
+ });
+ if (to.equals(id)) {
+ addFuture.whenComplete((res, exception2)
-> {
+ if (exception2 == null) {
+ promise.complete(null);
+ }
+ });
+ }
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ promise.completeExceptionally(ie);
+ }
+ });
+ phaseTwoLoop(reader, to, latestForKey, lh, outstanding,
promise);
}, scheduler);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 7a22bb1..b2c2bbe 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -36,8 +36,10 @@ import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
+
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -342,4 +344,102 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
Assert.assertEquals(message2.getMessageId(),
messages.get(2).getMessageId());
}
}
+
+ @Test
+ public void testBatchMessageIdsDontChange() throws Exception {
+ String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+ // subscribe before sending anything, so that we get all messages
+ pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+ .readCompacted(true).subscribe().close();
+
+ try (Producer producer =
pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
+ .enableBatching(true).batchingMaxMessages(3)
+ .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) {
+ producer.sendAsync(MessageBuilder.create()
+ .setKey("key1")
+ .setContent("my-message-1".getBytes()).build());
+ producer.sendAsync(MessageBuilder.create()
+ .setKey("key2")
+ .setContent("my-message-2".getBytes()).build());
+ producer.sendAsync(MessageBuilder.create()
+ .setKey("key2")
+
.setContent("my-message-3".getBytes()).build()).get();
+ }
+
+ // Read messages before compaction to get ids
+ List<Message> messages = new ArrayList<>();
+ try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
+ .subscriptionName("sub1").readCompacted(true).subscribe()) {
+ messages.add(consumer.receive());
+ messages.add(consumer.receive());
+ messages.add(consumer.receive());
+ }
+
+ // Ensure all messages are in same batch
+
Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getLedgerId(),
+
((BatchMessageIdImpl)messages.get(1).getMessageId()).getLedgerId());
+
Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getLedgerId(),
+
((BatchMessageIdImpl)messages.get(2).getMessageId()).getLedgerId());
+
Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getEntryId(),
+
((BatchMessageIdImpl)messages.get(1).getMessageId()).getEntryId());
+
Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getEntryId(),
+
((BatchMessageIdImpl)messages.get(2).getMessageId()).getEntryId());
+
+ // compact the topic
+ Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor.compact(topic).get();
+
+ // Check that messages after compaction have same ids
+ try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
+ .subscriptionName("sub1").readCompacted(true).subscribe()){
+ Message message1 = consumer.receive();
+ Assert.assertEquals(message1.getKey(), "key1");
+ Assert.assertEquals(new String(message1.getData()),
"my-message-1");
+ Assert.assertEquals(message1.getMessageId(),
messages.get(0).getMessageId());
+
+ Message message2 = consumer.receive();
+ Assert.assertEquals(message2.getKey(), "key2");
+ Assert.assertEquals(new String(message2.getData()),
"my-message-3");
+ Assert.assertEquals(message2.getMessageId(),
messages.get(2).getMessageId());
+ }
+ }
+
+ @Test
+ public void testWholeBatchCompactedOut() throws Exception {
+ String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+ // subscribe before sending anything, so that we get all messages
+ pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+ .readCompacted(true).subscribe().close();
+
+ try (Producer producerNormal =
pulsarClient.newProducer().topic(topic).create();
+ Producer producerBatch =
pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
+ .enableBatching(true).batchingMaxMessages(3)
+ .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) {
+ producerBatch.sendAsync(MessageBuilder.create()
+ .setKey("key1")
+
.setContent("my-message-1".getBytes()).build());
+ producerBatch.sendAsync(MessageBuilder.create()
+ .setKey("key1")
+
.setContent("my-message-2".getBytes()).build());
+ producerBatch.sendAsync(MessageBuilder.create()
+ .setKey("key1")
+
.setContent("my-message-3".getBytes()).build()).get();
+ producerNormal.sendAsync(MessageBuilder.create()
+ .setKey("key1")
+
.setContent("my-message-4".getBytes()).build()).get();
+ }
+
+ // compact the topic
+ Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor.compact(topic).get();
+
+ try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
+ .subscriptionName("sub1").readCompacted(true).subscribe()){
+ Message message = consumer.receive();
+ Assert.assertEquals(message.getKey(), "key1");
+ Assert.assertEquals(new String(message.getData()), "my-message-4");
+ }
+ }
}
--
To stop receiving notification emails like this one, please contact
[email protected].