This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dad679a  CompactedTopic should seek to position of cursor, not next 
position (#1336)
dad679a is described below

commit dad679ae1392bef4be67c654bcb7d19de79be266
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Wed Mar 7 19:25:26 2018 +0100

    CompactedTopic should seek to position of cursor, not next position (#1336)
    
    * CompactedTopic should seek to position of cursor, not next position
    
    When finding the first ledger entry to read from while reading from a
    compacted topic ledger, we should find the position represented by the
    cursor read position, not the next position after as has been done
    until now.
    
    This bug was due to a misunderstanding of how the read position
    works. It was assumed it worked the same as the mark-delete position,
    which is not the case.
    
    * Fix clash on message cleanup
---
 .../pulsar/compaction/CompactedTopicImpl.java      |  6 +--
 .../pulsar/compaction/CompactedTopicTest.java      | 27 ++++++------
 .../apache/pulsar/compaction/CompactionTest.java   | 48 ++++++++++++++++++++++
 3 files changed, 63 insertions(+), 18 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index e70381e..b1378b6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -133,11 +133,11 @@ public class CompactedTopicImpl implements CompactedTopic 
{
 
         CompletableFuture.allOf(startEntry, middleEntry, endEntry).thenRun(
                 () -> {
-                    if (comparePositionAndMessageId(p, startEntry.join()) < 0) 
{
+                    if (comparePositionAndMessageId(p, startEntry.join()) <= 
0) {
                         promise.complete(start);
-                    } else if (comparePositionAndMessageId(p, 
middleEntry.join()) < 0) {
+                    } else if (comparePositionAndMessageId(p, 
middleEntry.join()) <= 0) {
                         findStartPointLoop(p, start, midpoint, promise, cache);
-                    } else if (comparePositionAndMessageId(p, endEntry.join()) 
< 0) {
+                    } else if (comparePositionAndMessageId(p, endEntry.join()) 
<= 0) {
                         findStartPointLoop(p, midpoint + 1, end, promise, 
cache);
                     } else {
                         promise.complete(NEWER_THAN_COMPACTED);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index eb549fa..69d9cfb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -52,7 +52,7 @@ import io.netty.buffer.Unpooled;
 import lombok.Cleanup;
 
 public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
-    private static final ByteBuf emptyBuffer = Unpooled.buffer(0);
+    private final Random r = new Random(0);
 
     @BeforeMethod
     @Override
@@ -77,9 +77,8 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
      * entries in the ledger, and a list of gaps, and the entry which should 
be returned after the gap.
      */
     private Triple<Long, List<Pair<MessageIdData,Long>>, 
List<Pair<MessageIdData,Long>>>
-        buildCompactedLedger(BookKeeper bk, int seed, int count)
+        buildCompactedLedger(BookKeeper bk, int count)
             throws Exception {
-        Random r = new Random(seed);
         LedgerHandle lh = bk.createLedger(1, 1,
                                           
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
                                           
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
@@ -112,10 +111,12 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
                             .setEntryId(entryIds.addAndGet(delta + 1)).build();
 
                         @Cleanup
-                        RawMessage m = new RawMessageImpl(id, emptyBuffer);
+                        RawMessage m = new RawMessageImpl(id, 
Unpooled.EMPTY_BUFFER);
 
                         CompletableFuture<Void> f = new CompletableFuture<>();
-                        lh.asyncAddEntry(m.serialize(),
+                        ByteBuf buffer = m.serialize();
+
+                        lh.asyncAddEntry(buffer,
                                 (rc, ledger, eid, ctx) -> {
                                      if (rc != BKException.Code.OK) {
                                          
f.completeExceptionally(BKException.create(rc));
@@ -125,6 +126,7 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
                                          f.complete(null);
                                      }
                                 }, null);
+                        buffer.release();
                         return f;
                     }).toArray(CompletableFuture[]::new)).get();
         lh.close();
@@ -138,7 +140,7 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
                 this.conf, null);
 
         Triple<Long, List<Pair<MessageIdData, Long>>, List<Pair<MessageIdData, 
Long>>> compactedLedgerData
-            = buildCompactedLedger(bk, 0, 500);
+            = buildCompactedLedger(bk, 500);
 
         List<Pair<MessageIdData, Long>> positions = 
compactedLedgerData.getMiddle();
         List<Pair<MessageIdData, Long>> idsInGaps = 
compactedLedgerData.getRight();
@@ -170,19 +172,14 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
                             
Long.valueOf(CompactedTopicImpl.NEWER_THAN_COMPACTED));
 
         // shuffle to make cache work hard
-        Collections.shuffle(positions);
-        Collections.shuffle(idsInGaps);
+        Collections.shuffle(positions, r);
+        Collections.shuffle(idsInGaps, r);
 
         // Check ids we know are in compacted ledger
         for (Pair<MessageIdData, Long> p : positions) {
             PositionImpl pos = new PositionImpl(p.getLeft().getLedgerId(), 
p.getLeft().getEntryId());
-            if (p.equals(lastPosition)) {
-                Assert.assertEquals(CompactedTopicImpl.findStartPoint(pos, 
lastEntryId, cache).get(),
-                                    
Long.valueOf(CompactedTopicImpl.NEWER_THAN_COMPACTED));
-            } else {
-                Assert.assertEquals(CompactedTopicImpl.findStartPoint(pos, 
lastEntryId, cache).get(),
-                            Long.valueOf(p.getRight() + 1));
-            }
+            Long got = CompactedTopicImpl.findStartPoint(pos, lastEntryId, 
cache).get();
+            Assert.assertEquals(got, Long.valueOf(p.getRight()));
         }
 
         // Check ids we know are in the gaps of the compacted ledger
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 6a175b2..7a22bb1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -294,4 +294,52 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
             Assert.assertEquals(m.getData(), "content0".getBytes());
         }
     }
+
+    @Test
+    public void testFirstMessageRetained() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        // subscribe before sending anything, so that we get all messages
+        pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+            .readCompacted(true).subscribe().close();
+
+        try (Producer producer = pulsarClient.createProducer(topic)) {
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key1")
+                               .setContent("my-message-1".getBytes()).build());
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key2")
+                               .setContent("my-message-2".getBytes()).build());
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key2")
+                               
.setContent("my-message-3".getBytes()).build()).get();
+        }
+
+        // Read messages before compaction to get ids
+        List<Message> messages = new ArrayList<>();
+        try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
+                .subscriptionName("sub1").readCompacted(true).subscribe()) {
+            messages.add(consumer.receive());
+            messages.add(consumer.receive());
+            messages.add(consumer.receive());
+        }
+
+        // compact the topic
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic).get();
+
+        // Check that messages after compaction have same ids
+        try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
+                .subscriptionName("sub1").readCompacted(true).subscribe()){
+            Message message1 = consumer.receive();
+            Assert.assertEquals(message1.getKey(), "key1");
+            Assert.assertEquals(new String(message1.getData()), 
"my-message-1");
+            Assert.assertEquals(message1.getMessageId(), 
messages.get(0).getMessageId());
+
+            Message message2 = consumer.receive();
+            Assert.assertEquals(message2.getKey(), "key2");
+            Assert.assertEquals(new String(message2.getData()), 
"my-message-3");
+            Assert.assertEquals(message2.getMessageId(), 
messages.get(2).getMessageId());
+        }
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to