This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ba855c3  Add batching support to compaction (#1427)
ba855c3 is described below

commit ba855c36e786c0a38d2caee00e1dbaa707b4467d
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Wed Mar 28 02:05:30 2018 +0200

    Add batching support to compaction (#1427)
    
    With this patch compaction can select to keep only some of the
    submessages from a batch.
---
 .../pulsar/compaction/TwoPhaseCompactor.java       |  97 +++++++++++++-------
 .../apache/pulsar/compaction/CompactionTest.java   | 100 +++++++++++++++++++++
 2 files changed, 163 insertions(+), 34 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 3e2d259..f12870b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -21,8 +21,11 @@ package org.apache.pulsar.compaction;
 import com.google.common.collect.ImmutableMap;
 import io.netty.buffer.ByteBuf;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -43,6 +46,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.RawBatchConverter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -83,7 +87,8 @@ public class TwoPhaseCompactor extends Compactor {
                     if (exception != null) {
                         loopPromise.completeExceptionally(exception);
                     } else {
-                        log.info("Commencing phase one of compaction for {}, 
reading to {}", reader, lastMessageId);
+                        log.info("Commencing phase one of compaction for {}, 
reading to {}",
+                                 reader.getTopic(), lastMessageId);
                         phaseOneLoop(reader, Optional.empty(), lastMessageId, 
latestForKey, loopPromise);
                     }
                 });
@@ -108,8 +113,18 @@ public class TwoPhaseCompactor extends Compactor {
                             return;
                         }
                         MessageId id = m.getMessageId();
-                        String key = extractKey(m);
-                        latestForKey.put(key, id);
+                        if (RawBatchConverter.isBatch(m)) {
+                            try {
+                                RawBatchConverter.extractIdsAndKeys(m)
+                                    .forEach(e -> 
latestForKey.put(e.getRight(), e.getLeft()));
+                            } catch (IOException ioe) {
+                                log.info("Error decoding batch for message {}. 
Whole batch will be included in output",
+                                         id, ioe);
+                            }
+                        } else {
+                            String key = extractKey(m);
+                            latestForKey.put(key, id);
+                        }
 
                         if (id.compareTo(lastMessageId) == 0) {
                             loopPromise.complete(new 
PhaseOneResult(firstMessageId.orElse(id),
@@ -140,7 +155,7 @@ public class TwoPhaseCompactor extends Compactor {
 
         return createLedger(bk).thenCompose((ledger) -> {
                 log.info("Commencing phase two of compaction for {}, from {} 
to {}, compacting {} keys to ledger {}",
-                         reader, from, to, latestForKey.size(), 
ledger.getId());
+                         reader.getTopic(), from, to, latestForKey.size(), 
ledger.getId());
                 return phaseTwoSeekThenLoop(reader, from, to, latestForKey, 
bk, ledger);
             });
     }
@@ -179,41 +194,55 @@ public class TwoPhaseCompactor extends Compactor {
                               LedgerHandle lh, Semaphore outstanding, 
CompletableFuture<Void> promise) {
         reader.readNextAsync().whenCompleteAsync(
                 (m, exception) -> {
-                    try {
-                        if (exception != null) {
-                            promise.completeExceptionally(exception);
-                            return;
-                        } else if (promise.isDone()) {
-                            return;
+                    if (exception != null) {
+                        promise.completeExceptionally(exception);
+                        return;
+                    } else if (promise.isDone()) {
+                        return;
+                    }
+                    MessageId id = m.getMessageId();
+                    Optional<RawMessage> messageToAdd = Optional.empty();
+                    if (RawBatchConverter.isBatch(m)) {
+                        try {
+                            messageToAdd = RawBatchConverter.rebatchMessage(
+                                    m, (key, subid) -> 
latestForKey.get(key).equals(subid));
+                        } catch (IOException ioe) {
+                            log.info("Error decoding batch for message {}. 
Whole batch will be included in output",
+                                     id, ioe);
+                            messageToAdd = Optional.of(m);
                         }
-                        MessageId id = m.getMessageId();
+                    } else {
                         String key = extractKey(m);
-
                         if (latestForKey.get(key).equals(id)) {
-
-                            outstanding.acquire();
-                            CompletableFuture<Void> addFuture = 
addToCompactedLedger(lh, m)
-                                .whenComplete((res, exception2) -> {
-                                        outstanding.release();
-                                        if (exception2 != null) {
-                                            
promise.completeExceptionally(exception2);
-                                        }
-                                    });
-                            if (to.equals(id)) {
-                                addFuture.whenComplete((res, exception2) -> {
-                                        if (exception2 == null) {
-                                            promise.complete(null);
-                                        }
-                                    });
-                            }
+                            messageToAdd = Optional.of(m);
+                        } else {
+                            m.close();
                         }
-                        phaseTwoLoop(reader, to, latestForKey, lh, 
outstanding, promise);
-                    } catch (InterruptedException ie) {
-                        Thread.currentThread().interrupt();
-                        promise.completeExceptionally(ie);
-                    } finally {
-                        m.close();
                     }
+
+                    messageToAdd.ifPresent((toAdd) -> {
+                            try {
+                                outstanding.acquire();
+                                CompletableFuture<Void> addFuture = 
addToCompactedLedger(lh, toAdd)
+                                    .whenComplete((res, exception2) -> {
+                                            outstanding.release();
+                                            if (exception2 != null) {
+                                                
promise.completeExceptionally(exception2);
+                                            }
+                                        });
+                                if (to.equals(id)) {
+                                    addFuture.whenComplete((res, exception2) 
-> {
+                                            if (exception2 == null) {
+                                                promise.complete(null);
+                                            }
+                                        });
+                                }
+                            } catch (InterruptedException ie) {
+                                Thread.currentThread().interrupt();
+                                promise.completeExceptionally(ie);
+                            }
+                        });
+                    phaseTwoLoop(reader, to, latestForKey, lh, outstanding, 
promise);
                 }, scheduler);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 7a22bb1..b2c2bbe 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -36,8 +36,10 @@ import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
+
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -342,4 +344,102 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
             Assert.assertEquals(message2.getMessageId(), 
messages.get(2).getMessageId());
         }
     }
+
+    @Test
+    public void testBatchMessageIdsDontChange() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        // subscribe before sending anything, so that we get all messages
+        pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+            .readCompacted(true).subscribe().close();
+
+        try (Producer producer = 
pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
+                .enableBatching(true).batchingMaxMessages(3)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) {
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key1")
+                               .setContent("my-message-1".getBytes()).build());
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key2")
+                               .setContent("my-message-2".getBytes()).build());
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key2")
+                               
.setContent("my-message-3".getBytes()).build()).get();
+        }
+
+        // Read messages before compaction to get ids
+        List<Message> messages = new ArrayList<>();
+        try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
+             .subscriptionName("sub1").readCompacted(true).subscribe()) {
+            messages.add(consumer.receive());
+            messages.add(consumer.receive());
+            messages.add(consumer.receive());
+        }
+
+        // Ensure all messages are in same batch
+        
Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getLedgerId(),
+                            
((BatchMessageIdImpl)messages.get(1).getMessageId()).getLedgerId());
+        
Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getLedgerId(),
+                            
((BatchMessageIdImpl)messages.get(2).getMessageId()).getLedgerId());
+        
Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getEntryId(),
+                            
((BatchMessageIdImpl)messages.get(1).getMessageId()).getEntryId());
+        
Assert.assertEquals(((BatchMessageIdImpl)messages.get(0).getMessageId()).getEntryId(),
+                            
((BatchMessageIdImpl)messages.get(2).getMessageId()).getEntryId());
+
+        // compact the topic
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic).get();
+
+        // Check that messages after compaction have same ids
+        try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
+                .subscriptionName("sub1").readCompacted(true).subscribe()){
+            Message message1 = consumer.receive();
+            Assert.assertEquals(message1.getKey(), "key1");
+            Assert.assertEquals(new String(message1.getData()), 
"my-message-1");
+            Assert.assertEquals(message1.getMessageId(), 
messages.get(0).getMessageId());
+
+            Message message2 = consumer.receive();
+            Assert.assertEquals(message2.getKey(), "key2");
+            Assert.assertEquals(new String(message2.getData()), 
"my-message-3");
+            Assert.assertEquals(message2.getMessageId(), 
messages.get(2).getMessageId());
+        }
+    }
+
+    @Test
+    public void testWholeBatchCompactedOut() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        // subscribe before sending anything, so that we get all messages
+        pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+            .readCompacted(true).subscribe().close();
+
+        try (Producer producerNormal = 
pulsarClient.newProducer().topic(topic).create();
+             Producer producerBatch = 
pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
+                .enableBatching(true).batchingMaxMessages(3)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) {
+            producerBatch.sendAsync(MessageBuilder.create()
+                                    .setKey("key1")
+                                    
.setContent("my-message-1".getBytes()).build());
+            producerBatch.sendAsync(MessageBuilder.create()
+                                    .setKey("key1")
+                                    
.setContent("my-message-2".getBytes()).build());
+            producerBatch.sendAsync(MessageBuilder.create()
+                                    .setKey("key1")
+                                    
.setContent("my-message-3".getBytes()).build()).get();
+            producerNormal.sendAsync(MessageBuilder.create()
+                                     .setKey("key1")
+                                     
.setContent("my-message-4".getBytes()).build()).get();
+        }
+
+        // compact the topic
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic).get();
+
+        try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
+                .subscriptionName("sub1").readCompacted(true).subscribe()){
+            Message message = consumer.receive();
+            Assert.assertEquals(message.getKey(), "key1");
+            Assert.assertEquals(new String(message.getData()), "my-message-4");
+        }
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to