This is an automated email from the ASF dual-hosted git repository.

guangning pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4d8c0f2d054c03c80ceb625097db5ddcc6ddf8c7
Author: Yang Yang <[email protected]>
AuthorDate: Mon Feb 10 09:37:59 2020 +0800

    [Issue 6173][compaction] Fix log compaction for flow control/empty 
topic/last deletion (#6237)
    
    Fixes https://github.com/apache/pulsar/issues/6173
    
    ### Motivation
    
    Fixes problems for log compaction found in issue 
https://github.com/apache/pulsar/issues/6173 :
    
    1. Compaction fails for an empty topic.
    2. Compaction never ends if the value of the last message is an empty batch 
message when the compaction is triggered.
    3. Compaction fails for a topic with batch messages because RawReader flow 
control doesn't handle batch messages properly.
    
    ### Modifications
    
    1. Check if any message is available before compaction phases, and finish 
the compaction immediately if there is no messages to read to avoid timeout 
exception.
    2. Add missing check for empty batch message for the condition to end the 
phase 2 loop.
    3. Increase correct number of available permits in RawConsumer for batch 
messages.
    
    ### Verifying this change
    
    Producing messages in both batch and not-batch mode in corresponding tests.
---
 .../org/apache/pulsar/client/api/RawReader.java    |  7 +++
 .../apache/pulsar/client/impl/RawReaderImpl.java   | 18 +++++-
 .../pulsar/compaction/TwoPhaseCompactor.java       | 69 +++++++++++++---------
 .../apache/pulsar/client/impl/RawReaderTest.java   | 36 ++++++++++-
 .../apache/pulsar/compaction/CompactionTest.java   | 18 ++++--
 .../apache/pulsar/compaction/CompactorTest.java    |  2 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  2 +-
 7 files changed, 115 insertions(+), 37 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
index f9d297f..caf44ee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
@@ -45,6 +45,13 @@ public interface RawReader {
     String getTopic();
 
     /**
+     * Check if there is any message available to read.
+     *
+     * @return a completable future which will return whether there is any 
message available to read.
+     */
+    CompletableFuture<Boolean> hasMessageAvailableAsync();
+
+    /**
      * Seek to a location in the topic. After the seek, the first message read 
will be the one with
      * with the specified message ID.
      * @param messageId the message ID to seek to
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 44dd93f..3ca072d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -33,9 +33,11 @@ import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,6 +70,11 @@ public class RawReaderImpl implements RawReader {
     }
 
     @Override
+    public CompletableFuture<Boolean> hasMessageAvailableAsync() {
+        return consumer.hasMessageAvailableAsync();
+    }
+
+    @Override
     public CompletableFuture<Void> seekAsync(MessageId messageId) {
         return consumer.seekAsync(messageId);
     }
@@ -133,6 +140,15 @@ public class RawReaderImpl implements RawReader {
             if (future == null) {
                 assert(messageAndCnx == null);
             } else {
+                int numMsg;
+                try {
+                    MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(messageAndCnx.msg.getHeadersAndPayload());
+                    numMsg = msgMetadata.getNumMessagesInBatch();
+                    msgMetadata.recycle();
+                } catch (Throwable t) {
+                    // TODO message validation
+                    numMsg = 1;
+                }
                 if (!future.complete(messageAndCnx.msg)) {
                     messageAndCnx.msg.close();
                     closeAsync();
@@ -140,7 +156,7 @@ public class RawReaderImpl implements RawReader {
 
                 ClientCnx currentCnx = cnx();
                 if (currentCnx == messageAndCnx.cnx) {
-                    increaseAvailablePermits(currentCnx);
+                    increaseAvailablePermits(currentCnx, numMsg);
                 }
             }
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 62503ff..06afe93 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -72,8 +72,16 @@ public class TwoPhaseCompactor extends Compactor {
 
     @Override
     protected CompletableFuture<Long> doCompaction(RawReader reader, 
BookKeeper bk) {
-        return phaseOne(reader).thenCompose(
-                (r) -> phaseTwo(reader, r.from, r.to, r.lastReadId, 
r.latestForKey, bk));
+        return reader.hasMessageAvailableAsync()
+                .thenCompose(available -> {
+                    if (available) {
+                        return phaseOne(reader).thenCompose(
+                                (r) -> phaseTwo(reader, r.from, r.to, 
r.lastReadId, r.latestForKey, bk));
+                    } else {
+                        log.info("Skip compaction of the empty topic {}", 
reader.getTopic());
+                        return CompletableFuture.completedFuture(-1L);
+                    }
+                });
     }
 
     private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) {
@@ -233,36 +241,43 @@ public class TwoPhaseCompactor extends Compactor {
                             messageToAdd = Optional.of(m);
                         } else {
                             m.close();
-                            // Reached to last-id and phase-one found it 
deleted-message while iterating on ledger so, not
-                            // present under latestForKey. Complete the 
compaction.
-                            if (to.equals(id)) {
-                                promise.complete(null);
-                            }
                         }
                     }
 
-                    messageToAdd.ifPresent((toAdd) -> {
-                            try {
-                                outstanding.acquire();
-                                CompletableFuture<Void> addFuture = 
addToCompactedLedger(lh, toAdd)
+                    if (messageToAdd.isPresent()) {
+                        try {
+                            outstanding.acquire();
+                            CompletableFuture<Void> addFuture = 
addToCompactedLedger(lh, messageToAdd.get())
                                     .whenComplete((res, exception2) -> {
-                                            outstanding.release();
-                                            if (exception2 != null) {
-                                                
promise.completeExceptionally(exception2);
-                                            }
-                                        });
-                                if (to.equals(id)) {
-                                    addFuture.whenComplete((res, exception2) 
-> {
-                                            if (exception2 == null) {
-                                                promise.complete(null);
-                                            }
-                                        });
-                                }
-                            } catch (InterruptedException ie) {
-                                Thread.currentThread().interrupt();
-                                promise.completeExceptionally(ie);
+                                        outstanding.release();
+                                        if (exception2 != null) {
+                                            
promise.completeExceptionally(exception2);
+                                        }
+                                    });
+                            if (to.equals(id)) {
+                                addFuture.whenComplete((res, exception2) -> {
+                                    if (exception2 == null) {
+                                        promise.complete(null);
+                                    }
+                                });
                             }
-                        });
+                        } catch (InterruptedException ie) {
+                            Thread.currentThread().interrupt();
+                            promise.completeExceptionally(ie);
+                        }
+                    } else if (to.equals(id)) {
+                        // Reached to last-id and phase-one found it 
deleted-message while iterating on ledger so, not
+                        // present under latestForKey. Complete the compaction.
+                        try {
+                            // make sure all inflight writes have finished
+                            outstanding.acquire(MAX_OUTSTANDING);
+                            promise.complete(null);
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            promise.completeExceptionally(e);
+                        }
+                        return;
+                    }
                     phaseTwoLoop(reader, to, latestForKey, lh, outstanding, 
promise);
                 }, scheduler);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index e57e88d..b0c7cd1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -74,10 +74,16 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
     }
 
     private Set<String> publishMessages(String topic, int count) throws 
Exception {
+        return publishMessages(topic, count, false);
+    }
+
+    private Set<String> publishMessages(String topic, int count, boolean 
batching) throws Exception {
         Set<String> keys = new HashSet<>();
 
         try (Producer<byte[]> producer = pulsarClient.newProducer()
-            .enableBatching(false)
+            .enableBatching(batching)
+            // easier to create enough batches with a small batch size
+            .batchingMaxMessages(10)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .maxPendingMessages(count)
             .topic(topic)
@@ -234,6 +240,34 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testFlowControlBatch() throws Exception {
+        int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5;
+        String topic = "persistent://my-property/my-ns/my-raw-topic";
+
+        publishMessages(topic, numMessages, true);
+
+        RawReader reader = RawReader.create(pulsarClient, topic, 
subscription).get();
+        Set<String> keys = new HashSet<>();
+
+        while (true) {
+            try (RawMessage m = reader.readNextAsync().get(1, 
TimeUnit.SECONDS)) {
+                Assert.assertTrue(RawBatchConverter.isReadableBatch(m));
+                List<ImmutablePair<MessageId, String>> batchKeys = 
RawBatchConverter.extractIdsAndKeys(m);
+                // Assert each key is unique
+                for (ImmutablePair<MessageId, String> pair : batchKeys) {
+                    String key = pair.right;
+                    Assert.assertTrue(
+                            keys.add(key),
+                            "Received duplicated key '" + key + "' : already 
received keys = " + keys);
+                }
+            } catch (TimeoutException te) {
+                break;
+            }
+        }
+        Assert.assertEquals(keys.size(), numMessages);
+    }
+
+    @Test
     public void testBatchingExtractKeysAndIds() throws Exception {
         String topic = "persistent://my-property/my-ns/my-raw-topic";
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index a5716b3..a54ff4c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -64,6 +64,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 public class CompactionTest extends MockedPulsarServiceBaseTest {
@@ -1250,11 +1251,16 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
-    @Test(timeOut = 20000)
-    public void testCompactionWithLastDeletedKey() throws Exception {
+    @DataProvider(name = "lastDeletedBatching")
+    public static Object[][] lastDeletedBatching() {
+        return new Object[][] {{true}, {false}};
+    }
+
+    @Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
+    public void testCompactionWithLastDeletedKey(boolean batching) throws 
Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).enableBatching(false)
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).enableBatching(batching)
                 
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
 
         
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
@@ -1277,11 +1283,11 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
-    @Test(timeOut = 20000)
-    public void testEmptyCompactionLedger() throws Exception {
+    @Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
+    public void testEmptyCompactionLedger(boolean batching) throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).enableBatching(false)
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).enableBatching(batching)
                 
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
 
         
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index f418bc5..130793d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -207,7 +207,7 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
         Assert.assertEquals(keyOrder, Lists.newArrayList("c", "b", "a"));
     }
 
-    @Test(expectedExceptions = ExecutionException.class)
+    @Test
     public void testCompactEmptyTopic() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 3ebadf1..5b62248 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1142,7 +1142,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         increaseAvailablePermits(currentCnx, 1);
     }
 
-    private void increaseAvailablePermits(ClientCnx currentCnx, int delta) {
+    protected void increaseAvailablePermits(ClientCnx currentCnx, int delta) {
         int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta);
 
         while (available >= receiverQueueRefillThreshold && !paused) {

Reply via email to