This is an automated email from the ASF dual-hosted git repository. guangning pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4d8c0f2d054c03c80ceb625097db5ddcc6ddf8c7 Author: Yang Yang <[email protected]> AuthorDate: Mon Feb 10 09:37:59 2020 +0800 [Issue 6173][compaction] Fix log compaction for flow control/empty topic/last deletion (#6237) Fixes https://github.com/apache/pulsar/issues/6173 ### Motivation Fixes problems for log compaction found in issue https://github.com/apache/pulsar/issues/6173 : 1. Compaction fails for an empty topic. 2. Compaction never ends if the value of the last message is an empty batch message when the compaction is triggered. 3. Compaction fails for a topic with batch messages because RawReader flow control doesn't handle batch messages properly. ### Modifications 1. Check if any message is available before compaction phases, and finish the compaction immediately if there is no messages to read to avoid timeout exception. 2. Add missing check for empty batch message for the condition to end the phase 2 loop. 3. Increase correct number of available permits in RawConsumer for batch messages. ### Verifying this change Producing messages in both batch and not-batch mode in corresponding tests. --- .../org/apache/pulsar/client/api/RawReader.java | 7 +++ .../apache/pulsar/client/impl/RawReaderImpl.java | 18 +++++- .../pulsar/compaction/TwoPhaseCompactor.java | 69 +++++++++++++--------- .../apache/pulsar/client/impl/RawReaderTest.java | 36 ++++++++++- .../apache/pulsar/compaction/CompactionTest.java | 18 ++++-- .../apache/pulsar/compaction/CompactorTest.java | 2 +- .../apache/pulsar/client/impl/ConsumerImpl.java | 2 +- 7 files changed, 115 insertions(+), 37 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java index f9d297f..caf44ee 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java @@ -45,6 +45,13 @@ public interface RawReader { String getTopic(); /** + * Check if there is any message available to read. + * + * @return a completable future which will return whether there is any message available to read. + */ + CompletableFuture<Boolean> hasMessageAvailableAsync(); + + /** * Seek to a location in the topic. After the seek, the first message read will be the one with * with the specified message ID. * @param messageId the message ID to seek to diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index 44dd93f..3ca072d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -33,9 +33,11 @@ import org.apache.pulsar.client.api.RawReader; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,6 +70,11 @@ public class RawReaderImpl implements RawReader { } @Override + public CompletableFuture<Boolean> hasMessageAvailableAsync() { + return consumer.hasMessageAvailableAsync(); + } + + @Override public CompletableFuture<Void> seekAsync(MessageId messageId) { return consumer.seekAsync(messageId); } @@ -133,6 +140,15 @@ public class RawReaderImpl implements RawReader { if (future == null) { assert(messageAndCnx == null); } else { + int numMsg; + try { + MessageMetadata msgMetadata = Commands.parseMessageMetadata(messageAndCnx.msg.getHeadersAndPayload()); + numMsg = msgMetadata.getNumMessagesInBatch(); + msgMetadata.recycle(); + } catch (Throwable t) { + // TODO message validation + numMsg = 1; + } if (!future.complete(messageAndCnx.msg)) { messageAndCnx.msg.close(); closeAsync(); @@ -140,7 +156,7 @@ public class RawReaderImpl implements RawReader { ClientCnx currentCnx = cnx(); if (currentCnx == messageAndCnx.cnx) { - increaseAvailablePermits(currentCnx); + increaseAvailablePermits(currentCnx, numMsg); } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 62503ff..06afe93 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -72,8 +72,16 @@ public class TwoPhaseCompactor extends Compactor { @Override protected CompletableFuture<Long> doCompaction(RawReader reader, BookKeeper bk) { - return phaseOne(reader).thenCompose( - (r) -> phaseTwo(reader, r.from, r.to, r.lastReadId, r.latestForKey, bk)); + return reader.hasMessageAvailableAsync() + .thenCompose(available -> { + if (available) { + return phaseOne(reader).thenCompose( + (r) -> phaseTwo(reader, r.from, r.to, r.lastReadId, r.latestForKey, bk)); + } else { + log.info("Skip compaction of the empty topic {}", reader.getTopic()); + return CompletableFuture.completedFuture(-1L); + } + }); } private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) { @@ -233,36 +241,43 @@ public class TwoPhaseCompactor extends Compactor { messageToAdd = Optional.of(m); } else { m.close(); - // Reached to last-id and phase-one found it deleted-message while iterating on ledger so, not - // present under latestForKey. Complete the compaction. - if (to.equals(id)) { - promise.complete(null); - } } } - messageToAdd.ifPresent((toAdd) -> { - try { - outstanding.acquire(); - CompletableFuture<Void> addFuture = addToCompactedLedger(lh, toAdd) + if (messageToAdd.isPresent()) { + try { + outstanding.acquire(); + CompletableFuture<Void> addFuture = addToCompactedLedger(lh, messageToAdd.get()) .whenComplete((res, exception2) -> { - outstanding.release(); - if (exception2 != null) { - promise.completeExceptionally(exception2); - } - }); - if (to.equals(id)) { - addFuture.whenComplete((res, exception2) -> { - if (exception2 == null) { - promise.complete(null); - } - }); - } - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - promise.completeExceptionally(ie); + outstanding.release(); + if (exception2 != null) { + promise.completeExceptionally(exception2); + } + }); + if (to.equals(id)) { + addFuture.whenComplete((res, exception2) -> { + if (exception2 == null) { + promise.complete(null); + } + }); } - }); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + promise.completeExceptionally(ie); + } + } else if (to.equals(id)) { + // Reached to last-id and phase-one found it deleted-message while iterating on ledger so, not + // present under latestForKey. Complete the compaction. + try { + // make sure all inflight writes have finished + outstanding.acquire(MAX_OUTSTANDING); + promise.complete(null); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + promise.completeExceptionally(e); + } + return; + } phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise); }, scheduler); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java index e57e88d..b0c7cd1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java @@ -74,10 +74,16 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest { } private Set<String> publishMessages(String topic, int count) throws Exception { + return publishMessages(topic, count, false); + } + + private Set<String> publishMessages(String topic, int count, boolean batching) throws Exception { Set<String> keys = new HashSet<>(); try (Producer<byte[]> producer = pulsarClient.newProducer() - .enableBatching(false) + .enableBatching(batching) + // easier to create enough batches with a small batch size + .batchingMaxMessages(10) .messageRoutingMode(MessageRoutingMode.SinglePartition) .maxPendingMessages(count) .topic(topic) @@ -234,6 +240,34 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest { } @Test + public void testFlowControlBatch() throws Exception { + int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5; + String topic = "persistent://my-property/my-ns/my-raw-topic"; + + publishMessages(topic, numMessages, true); + + RawReader reader = RawReader.create(pulsarClient, topic, subscription).get(); + Set<String> keys = new HashSet<>(); + + while (true) { + try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) { + Assert.assertTrue(RawBatchConverter.isReadableBatch(m)); + List<ImmutablePair<MessageId, String>> batchKeys = RawBatchConverter.extractIdsAndKeys(m); + // Assert each key is unique + for (ImmutablePair<MessageId, String> pair : batchKeys) { + String key = pair.right; + Assert.assertTrue( + keys.add(key), + "Received duplicated key '" + key + "' : already received keys = " + keys); + } + } catch (TimeoutException te) { + break; + } + } + Assert.assertEquals(keys.size(), numMessages); + } + + @Test public void testBatchingExtractKeysAndIds() throws Exception { String topic = "persistent://my-property/my-ns/my-raw-topic"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index a5716b3..a54ff4c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -64,6 +64,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; public class CompactionTest extends MockedPulsarServiceBaseTest { @@ -1250,11 +1251,16 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { } } - @Test(timeOut = 20000) - public void testCompactionWithLastDeletedKey() throws Exception { + @DataProvider(name = "lastDeletedBatching") + public static Object[][] lastDeletedBatching() { + return new Object[][] {{true}, {false}}; + } + + @Test(timeOut = 20000, dataProvider = "lastDeletedBatching") + public void testCompactionWithLastDeletedKey(boolean batching) throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false) + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(batching) .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); @@ -1277,11 +1283,11 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { } } - @Test(timeOut = 20000) - public void testEmptyCompactionLedger() throws Exception { + @Test(timeOut = 20000, dataProvider = "lastDeletedBatching") + public void testEmptyCompactionLedger(boolean batching) throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false) + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(batching) .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java index f418bc5..130793d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java @@ -207,7 +207,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest { Assert.assertEquals(keyOrder, Lists.newArrayList("c", "b", "a")); } - @Test(expectedExceptions = ExecutionException.class) + @Test public void testCompactEmptyTopic() throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 3ebadf1..5b62248 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1142,7 +1142,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle increaseAvailablePermits(currentCnx, 1); } - private void increaseAvailablePermits(ClientCnx currentCnx, int delta) { + protected void increaseAvailablePermits(ClientCnx currentCnx, int delta) { int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta); while (available >= receiverQueueRefillThreshold && !paused) {
