merlimat closed pull request #1336: CompactedTopic should seek to position of 
cursor, not next position
URL: https://github.com/apache/incubator-pulsar/pull/1336
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index e70381edb..b1378b648 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -133,11 +133,11 @@ private static void findStartPointLoop(PositionImpl p, 
long start, long end,
 
         CompletableFuture.allOf(startEntry, middleEntry, endEntry).thenRun(
                 () -> {
-                    if (comparePositionAndMessageId(p, startEntry.join()) < 0) 
{
+                    if (comparePositionAndMessageId(p, startEntry.join()) <= 
0) {
                         promise.complete(start);
-                    } else if (comparePositionAndMessageId(p, 
middleEntry.join()) < 0) {
+                    } else if (comparePositionAndMessageId(p, 
middleEntry.join()) <= 0) {
                         findStartPointLoop(p, start, midpoint, promise, cache);
-                    } else if (comparePositionAndMessageId(p, endEntry.join()) 
< 0) {
+                    } else if (comparePositionAndMessageId(p, endEntry.join()) 
<= 0) {
                         findStartPointLoop(p, midpoint + 1, end, promise, 
cache);
                     } else {
                         promise.complete(NEWER_THAN_COMPACTED);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index eb549fa44..69d9cfbc7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -52,7 +52,7 @@
 import lombok.Cleanup;
 
 public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
-    private static final ByteBuf emptyBuffer = Unpooled.buffer(0);
+    private final Random r = new Random(0);
 
     @BeforeMethod
     @Override
@@ -77,9 +77,8 @@ public void cleanup() throws Exception {
      * entries in the ledger, and a list of gaps, and the entry which should 
be returned after the gap.
      */
     private Triple<Long, List<Pair<MessageIdData,Long>>, 
List<Pair<MessageIdData,Long>>>
-        buildCompactedLedger(BookKeeper bk, int seed, int count)
+        buildCompactedLedger(BookKeeper bk, int count)
             throws Exception {
-        Random r = new Random(seed);
         LedgerHandle lh = bk.createLedger(1, 1,
                                           
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
                                           
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
@@ -112,10 +111,12 @@ public void cleanup() throws Exception {
                             .setEntryId(entryIds.addAndGet(delta + 1)).build();
 
                         @Cleanup
-                        RawMessage m = new RawMessageImpl(id, emptyBuffer);
+                        RawMessage m = new RawMessageImpl(id, 
Unpooled.EMPTY_BUFFER);
 
                         CompletableFuture<Void> f = new CompletableFuture<>();
-                        lh.asyncAddEntry(m.serialize(),
+                        ByteBuf buffer = m.serialize();
+
+                        lh.asyncAddEntry(buffer,
                                 (rc, ledger, eid, ctx) -> {
                                      if (rc != BKException.Code.OK) {
                                          
f.completeExceptionally(BKException.create(rc));
@@ -125,6 +126,7 @@ public void cleanup() throws Exception {
                                          f.complete(null);
                                      }
                                 }, null);
+                        buffer.release();
                         return f;
                     }).toArray(CompletableFuture[]::new)).get();
         lh.close();
@@ -138,7 +140,7 @@ public void testEntryLookup() throws Exception {
                 this.conf, null);
 
         Triple<Long, List<Pair<MessageIdData, Long>>, List<Pair<MessageIdData, 
Long>>> compactedLedgerData
-            = buildCompactedLedger(bk, 0, 500);
+            = buildCompactedLedger(bk, 500);
 
         List<Pair<MessageIdData, Long>> positions = 
compactedLedgerData.getMiddle();
         List<Pair<MessageIdData, Long>> idsInGaps = 
compactedLedgerData.getRight();
@@ -170,19 +172,14 @@ public void testEntryLookup() throws Exception {
                             
Long.valueOf(CompactedTopicImpl.NEWER_THAN_COMPACTED));
 
         // shuffle to make cache work hard
-        Collections.shuffle(positions);
-        Collections.shuffle(idsInGaps);
+        Collections.shuffle(positions, r);
+        Collections.shuffle(idsInGaps, r);
 
         // Check ids we know are in compacted ledger
         for (Pair<MessageIdData, Long> p : positions) {
             PositionImpl pos = new PositionImpl(p.getLeft().getLedgerId(), 
p.getLeft().getEntryId());
-            if (p.equals(lastPosition)) {
-                Assert.assertEquals(CompactedTopicImpl.findStartPoint(pos, 
lastEntryId, cache).get(),
-                                    
Long.valueOf(CompactedTopicImpl.NEWER_THAN_COMPACTED));
-            } else {
-                Assert.assertEquals(CompactedTopicImpl.findStartPoint(pos, 
lastEntryId, cache).get(),
-                            Long.valueOf(p.getRight() + 1));
-            }
+            Long got = CompactedTopicImpl.findStartPoint(pos, lastEntryId, 
cache).get();
+            Assert.assertEquals(got, Long.valueOf(p.getRight()));
         }
 
         // Check ids we know are in the gaps of the compacted ledger
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 6a175b277..7a22bb1ff 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -294,4 +294,52 @@ public void testCompactEmptyTopic() throws Exception {
             Assert.assertEquals(m.getData(), "content0".getBytes());
         }
     }
+
+    @Test
+    public void testFirstMessageRetained() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        // subscribe before sending anything, so that we get all messages
+        pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+            .readCompacted(true).subscribe().close();
+
+        try (Producer producer = pulsarClient.createProducer(topic)) {
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key1")
+                               .setContent("my-message-1".getBytes()).build());
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key2")
+                               .setContent("my-message-2".getBytes()).build());
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key2")
+                               
.setContent("my-message-3".getBytes()).build()).get();
+        }
+
+        // Read messages before compaction to get ids
+        List<Message> messages = new ArrayList<>();
+        try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
+                .subscriptionName("sub1").readCompacted(true).subscribe()) {
+            messages.add(consumer.receive());
+            messages.add(consumer.receive());
+            messages.add(consumer.receive());
+        }
+
+        // compact the topic
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic).get();
+
+        // Check that messages after compaction have same ids
+        try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
+                .subscriptionName("sub1").readCompacted(true).subscribe()){
+            Message message1 = consumer.receive();
+            Assert.assertEquals(message1.getKey(), "key1");
+            Assert.assertEquals(new String(message1.getData()), 
"my-message-1");
+            Assert.assertEquals(message1.getMessageId(), 
messages.get(0).getMessageId());
+
+            Message message2 = consumer.receive();
+            Assert.assertEquals(message2.getKey(), "key2");
+            Assert.assertEquals(new String(message2.getData()), 
"my-message-3");
+            Assert.assertEquals(message2.getMessageId(), 
messages.get(2).getMessageId());
+        }
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to