This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 43cd22e632d1938b2c5ee7cd860fc46d20fed921 Author: Yunze Xu <[email protected]> AuthorDate: Fri May 13 20:18:42 2022 +0800 [Java Client] Fix wrong behavior of deduplication for key based batching (#15413) ### Motivation Currently message deduplication doesn't work well for key based batching. First, the key based batch container doesn't update the `lastSequenceIdPushed`. So a batch could contain both duplicated and not duplicated messages. Second, when `createOpSendMsgs` is called, the `OpSendMsg` objects are sorted by the lowest sequence ids, and the highest sequence id is not set. If a batch contains sequence id 0,1,2, then the message with sequence id 1 or 2 won't be dropped. ### Modifications - Refactor the key based batch container that the `BatchMessageContainerImpl` is reused instead of maintaining a `KeyedBatch` class. - When `createOpSendMsgs` is called, clear the highest sequence id field and configure the sequence id field with the highest sequence id to fix the second issue described before. - Add `testKeyBasedBatchingOrder` to show and verify the current behavior. - Add test for key based batching into `testProducerDeduplicationWithDiscontinuousSequenceId` to verify `lastSlastSequenceIdPushed` is updated correctly. (cherry picked from commit a77333705ffb352da39767d53975353bb4f8864e) --- .../pulsar/client/api/ClientDeduplicationTest.java | 98 +++++++++- .../client/impl/BatchMessageContainerImpl.java | 27 ++- .../client/impl/BatchMessageKeyBasedContainer.java | 211 +++++---------------- 3 files changed, 155 insertions(+), 181 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java index 304bb6eaaa0..52017444a2b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java @@ -20,19 +20,37 @@ package org.apache.pulsar.client.api; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; - +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +@Slf4j @Test(groups = "flaky") public class ClientDeduplicationTest extends ProducerConsumerBase { + @DataProvider + public static Object[][] batchingTypes() { + return new Object[][] { + { BatcherBuilder.DEFAULT }, + { BatcherBuilder.KEY_BASED } + }; + } + @BeforeClass @Override protected void setup() throws Exception { @@ -46,7 +64,7 @@ public class ClientDeduplicationTest extends ProducerConsumerBase { super.internalCleanup(); } - @Test + @Test(priority = -1) public void testNamespaceDeduplicationApi() throws Exception { final String namespace = "my-property/my-ns"; assertNull(admin.namespaces().getDeduplicationStatus(namespace)); @@ -174,9 +192,10 @@ public class ClientDeduplicationTest extends ProducerConsumerBase { producer.close(); } - @Test(timeOut = 30000) - public void testProducerDeduplicationWithDiscontinuousSequenceId() throws Exception { - String topic = "persistent://my-property/my-ns/testProducerDeduplicationWithDiscontinuousSequenceId"; + @Test(timeOut = 30000, dataProvider = "batchingTypes") + public void testProducerDeduplicationWithDiscontinuousSequenceId(BatcherBuilder batcherBuilder) throws Exception { + String topic = "persistent://my-property/my-ns/testProducerDeduplicationWithDiscontinuousSequenceId-" + + System.currentTimeMillis(); admin.namespaces().setDeduplicationStatus("my-property/my-ns", true); // Set infinite timeout @@ -185,7 +204,9 @@ public class ClientDeduplicationTest extends ProducerConsumerBase { .topic(topic) .producerName("my-producer-name") .enableBatching(true) + .batcherBuilder(batcherBuilder) .batchingMaxMessages(10) + .batchingMaxPublishDelay(1L, TimeUnit.HOURS) .sendTimeout(0, TimeUnit.SECONDS); Producer<byte[]> producer = producerBuilder.create(); @@ -208,7 +229,8 @@ public class ClientDeduplicationTest extends ProducerConsumerBase { producer.flush(); for (int i = 0; i < 4; i++) { - Message<byte[]> msg = consumer.receive(); + Message<byte[]> msg = consumer.receive(3, TimeUnit.SECONDS); + assertNotNull(msg); assertEquals(new String(msg.getData()), "my-message-" + i); consumer.acknowledge(msg); } @@ -284,4 +306,68 @@ public class ClientDeduplicationTest extends ProducerConsumerBase { producer.close(); } + + @Test(timeOut = 30000) + public void testKeyBasedBatchingOrder() throws Exception { + final String topic = "persistent://my-property/my-ns/test-key-based-batching-order"; + admin.namespaces().setDeduplicationStatus("my-property/my-ns", true); + + final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("sub") + .subscribe(); + final Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .batcherBuilder(BatcherBuilder.KEY_BASED) + .batchingMaxMessages(100) + .batchingMaxBytes(1024 * 1024 * 5) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .create(); + // | key | sequence id list | + // | :-- | :--------------- | + // | A | 0, 3, 4 | + // | B | 1, 2 | + final List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>(); + sendFutures.add(producer.newMessage().key("A").value("msg-0").sequenceId(0L).sendAsync()); + sendFutures.add(producer.newMessage().key("B").value("msg-1").sequenceId(1L).sendAsync()); + sendFutures.add(producer.newMessage().key("B").value("msg-2").sequenceId(2L).sendAsync()); + sendFutures.add(producer.newMessage().key("A").value("msg-3").sequenceId(3L).sendAsync()); + sendFutures.add(producer.newMessage().key("A").value("msg-4").sequenceId(4L).sendAsync()); + // The message order is expected to be [1, 2, 0, 3, 4]. The sequence ids are not ordered strictly, but: + // 1. The sequence ids for a given key are ordered. + // 2. The highest sequence ids of batches are ordered. + producer.flush(); + + FutureUtil.waitForAll(sendFutures); + final List<MessageId> sendMessageIds = sendFutures.stream().map(CompletableFuture::join) + .collect(Collectors.toList()); + for (int i = 0; i < sendMessageIds.size(); i++) { + log.info("Send msg-{} to {}", i, sendMessageIds.get(i)); + } + + final List<Long> sequenceIdList = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + final Message<String> msg = consumer.receive(3, TimeUnit.SECONDS); + if (msg == null) { + break; + } + log.info("Received {}, key: {}, seq id: {}, msg id: {}", + msg.getValue(), msg.getKey(), msg.getSequenceId(), msg.getMessageId()); + assertNotNull(msg); + sequenceIdList.add(msg.getSequenceId()); + } + assertEquals(sequenceIdList, Arrays.asList(1L, 2L, 0L, 3L, 4L)); + + for (int i = 0; i < 5; i++) { + // Currently sending a duplicated message won't throw an exception. Instead, an invalid result is returned. + final MessageId messageId = producer.newMessage().value("msg").sequenceId(i).send(); + assertTrue(messageId instanceof BatchMessageIdImpl); + final BatchMessageIdImpl messageIdImpl = (BatchMessageIdImpl) messageId; + assertEquals(messageIdImpl.getLedgerId(), -1L); + assertEquals(messageIdImpl.getEntryId(), -1L); + } + + consumer.close(); + producer.close(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index cea567e8d68..5811db76a64 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -21,11 +21,12 @@ package org.apache.pulsar.client.impl; import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; - +import io.netty.util.ReferenceCountUtil; import java.io.IOException; import java.util.Arrays; import java.util.List; - +import lombok.Getter; +import lombok.Setter; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; @@ -49,7 +50,11 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { private MessageMetadata messageMetadata = new MessageMetadata(); // sequence id for this batch which will be persisted as a single entry by broker + @Getter + @Setter private long lowestSequenceId = -1L; + @Getter + @Setter private long highestSequenceId = -1L; private ByteBuf batchedMessageMetadataAndPayload; private List<MessageImpl<?>> messages = Lists.newArrayList(); @@ -57,6 +62,14 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { // keep track of callbacks for individual messages being published in a batch protected SendCallback firstCallback; + public BatchMessageContainerImpl() { + } + + public BatchMessageContainerImpl(ProducerImpl<?> producer) { + this(); + setProducer(producer); + } + @Override public boolean add(MessageImpl<?> msg, SendCallback callback) { @@ -82,10 +95,6 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { } } catch (Throwable e) { log.error("construct first message failed, exception is ", e); - if (batchedMessageMetadataAndPayload != null) { - // if payload has been allocated release it - batchedMessageMetadataAndPayload.release(); - } discard(new PulsarClientException(e)); return false; } @@ -104,7 +113,6 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { } highestSequenceId = msg.getSequenceId(); ProducerImpl.LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(producer, prev -> Math.max(prev, msg.getSequenceId())); - return isBatchFull(); } @@ -172,6 +180,10 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { if (firstCallback != null) { firstCallback.sendComplete(ex); } + if (batchedMessageMetadataAndPayload != null) { + ReferenceCountUtil.safeRelease(batchedMessageMetadataAndPayload); + batchedMessageMetadataAndPayload = null; + } } catch (Throwable t) { log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName, producerName, lowestSequenceId, t); @@ -193,6 +205,7 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { return null; } messageMetadata.setNumMessagesInBatch(numMessagesInBatch); + messageMetadata.setSequenceId(lowestSequenceId); messageMetadata.setHighestSequenceId(highestSequenceId); if (currentTxnidMostBits != -1) { messageMetadata.setTxnidMostBits(currentTxnidMostBits); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java index 505ca75743c..77990eeeacb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java @@ -18,27 +18,12 @@ */ package org.apache.pulsar.client.impl; -import com.google.common.collect.ComparisonChain; -import com.google.common.collect.Lists; - -import io.netty.buffer.ByteBuf; -import io.netty.util.ReferenceCountUtil; - import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Base64; import java.util.HashMap; import java.util.List; import java.util.Map; - -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; -import org.apache.pulsar.common.api.proto.CompressionType; -import org.apache.pulsar.common.api.proto.MessageMetadata; -import org.apache.pulsar.common.compression.CompressionCodec; -import org.apache.pulsar.common.protocol.ByteBufPair; -import org.apache.pulsar.common.protocol.Commands; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +38,7 @@ import org.slf4j.LoggerFactory; */ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer { - private Map<String, KeyedBatch> batches = new HashMap<>(); + private final Map<String, BatchMessageContainerImpl> batches = new HashMap<>(); @Override public boolean add(MessageImpl<?> msg, SendCallback callback) { @@ -61,29 +46,16 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer { log.debug("[{}] [{}] add message to batch, num messages in batch so far is {}", topicName, producerName, numMessagesInBatch); } - numMessagesInBatch++; - currentBatchSizeBytes += msg.getDataBuffer().readableBytes(); String key = getKey(msg); - KeyedBatch part = batches.get(key); - if (part == null) { - part = new KeyedBatch(); - part.addMsg(msg, callback); - part.compressionType = compressionType; - part.compressor = compressor; - part.maxBatchSize = maxBatchSize; - part.topicName = topicName; - part.producerName = producerName; - batches.putIfAbsent(key, part); - - if (msg.getMessageBuilder().hasTxnidMostBits() && currentTxnidMostBits == -1) { - currentTxnidMostBits = msg.getMessageBuilder().getTxnidMostBits(); - } - if (msg.getMessageBuilder().hasTxnidLeastBits() && currentTxnidLeastBits == -1) { - currentTxnidLeastBits = msg.getMessageBuilder().getTxnidLeastBits(); - } - - } else { - part.addMsg(msg, callback); + final BatchMessageContainerImpl batchMessageContainer = batches.computeIfAbsent(key, + __ -> new BatchMessageContainerImpl(producer)); + batchMessageContainer.add(msg, callback); + // The `add` method fails iff the container is empty, i.e. the `msg` is the first message to add, while `msg` + // was failed to add. In this case, `clear` method will be called and the batch container is empty and there is + // no need to update the stats. + if (!batchMessageContainer.isEmpty()) { + numMessagesInBatch++; + currentBatchSizeBytes += msg.getDataBuffer().readableBytes(); } return isBatchFull(); } @@ -92,7 +64,7 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer { public void clear() { numMessagesInBatch = 0; currentBatchSizeBytes = 0; - batches = new HashMap<>(); + batches.clear(); currentTxnidMostBits = -1L; currentTxnidLeastBits = -1L; } @@ -104,13 +76,7 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer { @Override public void discard(Exception ex) { - try { - // Need to protect ourselves from any exception being thrown in the future handler from the application - batches.forEach((k, v) -> v.firstCallback.sendComplete(ex)); - } catch (Throwable t) { - log.warn("[{}] [{}] Got exception while completing the callback", topicName, producerName, t); - } - batches.forEach((k, v) -> ReferenceCountUtil.safeRelease(v.batchedMessageMetadataAndPayload)); + batches.forEach((k, v) -> v.discard(ex)); clear(); } @@ -119,64 +85,45 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer { return true; } - private ProducerImpl.OpSendMsg createOpSendMsg(KeyedBatch keyedBatch) throws IOException { - ByteBuf encryptedPayload = producer.encryptMessage(keyedBatch.messageMetadata, keyedBatch.getCompressedBatchMetadataAndPayload()); - if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) { - keyedBatch.discard(new PulsarClientException.InvalidMessageException( - "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes")); - return null; - } - - final int numMessagesInBatch = keyedBatch.messages.size(); - long currentBatchSizeBytes = 0; - for (MessageImpl<?> message : keyedBatch.messages) { - currentBatchSizeBytes += message.getDataBuffer().readableBytes(); - } - keyedBatch.messageMetadata.setNumMessagesInBatch(numMessagesInBatch); - if (currentTxnidMostBits != -1) { - keyedBatch.messageMetadata.setTxnidMostBits(currentTxnidMostBits); - } - if (currentTxnidLeastBits != -1) { - keyedBatch.messageMetadata.setTxnidLeastBits(currentTxnidLeastBits); - } - ByteBufPair cmd = producer.sendMessage(producer.producerId, keyedBatch.sequenceId, numMessagesInBatch, - keyedBatch.messageMetadata, encryptedPayload); - - ProducerImpl.OpSendMsg op = ProducerImpl.OpSendMsg.create(keyedBatch.messages, cmd, keyedBatch.sequenceId, keyedBatch.firstCallback); - - op.setNumMessagesInBatch(numMessagesInBatch); - op.setBatchSizeByte(currentBatchSizeBytes); - return op; - } - @Override public List<ProducerImpl.OpSendMsg> createOpSendMsgs() throws IOException { - List<ProducerImpl.OpSendMsg> result = new ArrayList<>(); - List<KeyedBatch> list = new ArrayList<>(batches.values()); - list.sort(((o1, o2) -> ComparisonChain.start() - .compare(o1.sequenceId, o2.sequenceId) - .result())); - for (KeyedBatch keyedBatch : list) { - ProducerImpl.OpSendMsg op = createOpSendMsg(keyedBatch); - if (op != null) { - result.add(op); + try { + // In key based batching, the sequence ids might not be ordered, for example, + // | key | sequence id list | + // | :-- | :--------------- | + // | A | 0, 3, 4 | + // | B | 1, 2 | + // The message order should be 1, 2, 0, 3, 4 so that a message with a sequence id <= 4 should be dropped. + // However, for a MessageMetadata with both `sequence_id` and `highest_sequence_id` fields, the broker will + // expect a strict order so that the batch of key "A" (0, 3, 4) will be dropped. + // Therefore, we should update the `sequence_id` field to the highest sequence id and remove the + // `highest_sequence_id` field to allow the weak order. + batches.values().forEach(batchMessageContainer -> { + batchMessageContainer.setLowestSequenceId(batchMessageContainer.getHighestSequenceId()); + }); + return batches.values().stream().sorted((o1, o2) -> + (int) (o1.getLowestSequenceId() - o2.getLowestSequenceId()) + ).map(batchMessageContainer -> { + try { + return batchMessageContainer.createOpSendMsg(); + } catch (IOException e) { + throw new IllegalStateException(e); + } + }).collect(Collectors.toList()); + } catch (IllegalStateException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } else { + throw e; } } - return result; } @Override public boolean hasSameSchema(MessageImpl<?> msg) { String key = getKey(msg); - KeyedBatch part = batches.get(key); - if (part == null || part.messages.isEmpty()) { - return true; - } - if (!part.messageMetadata.hasSchemaVersion()) { - return msg.getSchemaVersion() == null; - } - return Arrays.equals(msg.getSchemaVersion(), - part.messageMetadata.getSchemaVersion()); + BatchMessageContainerImpl batchMessageContainer = batches.get(key); + return batchMessageContainer == null || batchMessageContainer.hasSameSchema(msg); } private String getKey(MessageImpl<?> msg) { @@ -186,78 +133,6 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer { return msg.getKey(); } - private static class KeyedBatch { - private final MessageMetadata messageMetadata = new MessageMetadata(); - // sequence id for this batch which will be persisted as a single entry by broker - private long sequenceId = -1; - private ByteBuf batchedMessageMetadataAndPayload; - private List<MessageImpl<?>> messages = Lists.newArrayList(); - private SendCallback previousCallback = null; - private CompressionType compressionType; - private CompressionCodec compressor; - private int maxBatchSize; - private String topicName; - private String producerName; - - // keep track of callbacks for individual messages being published in a batch - private SendCallback firstCallback; - - private ByteBuf getCompressedBatchMetadataAndPayload() { - for (MessageImpl<?> msg : messages) { - batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msg.getMessageBuilder(), - msg.getDataBuffer(), batchedMessageMetadataAndPayload); - } - int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes(); - ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload); - batchedMessageMetadataAndPayload.release(); - if (compressionType != CompressionType.NONE) { - messageMetadata.setCompression(compressionType); - messageMetadata.setUncompressedSize(uncompressedSize); - } - - // Update the current max batch size using the uncompressed size, which is what we need in any case to - // accumulate the batch content - maxBatchSize = Math.max(maxBatchSize, uncompressedSize); - return compressedPayload; - } - - private void addMsg(MessageImpl<?> msg, SendCallback callback) { - if (messages.size() == 0) { - sequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder()); - batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT - .buffer(Math.min(maxBatchSize, ClientCnx.getMaxMessageSize())); - firstCallback = callback; - } - if (previousCallback != null) { - previousCallback.addCallback(msg, callback); - } - previousCallback = callback; - messages.add(msg); - } - - public void discard(Exception ex) { - try { - // Need to protect ourselves from any exception being thrown in the future handler from the application - if (firstCallback != null) { - firstCallback.sendComplete(ex); - } - } catch (Throwable t) { - log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName, producerName, - sequenceId, t); - } - clear(); - } - - public void clear() { - messages = Lists.newArrayList(); - firstCallback = null; - previousCallback = null; - messageMetadata.clear(); - sequenceId = -1; - batchedMessageMetadataAndPayload = null; - } - } - private static final Logger log = LoggerFactory.getLogger(BatchMessageKeyBasedContainer.class); }
