This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new c75018a6db7 [fix][client] Fix building broken batched message when publishing (#24061) c75018a6db7 is described below commit c75018a6db712493de33664d78ced31f4e0e7094 Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Sat Mar 15 05:14:12 2025 +0800 [fix][client] Fix building broken batched message when publishing (#24061) --- .../common/protocol/ProducerBatchSendTest.java | 141 +++++++++++++++++++++ .../client/impl/BatchMessageContainerBase.java | 5 + .../client/impl/BatchMessageContainerImpl.java | 14 +- .../client/impl/BatchMessageKeyBasedContainer.java | 7 + .../apache/pulsar/client/impl/ProducerImpl.java | 5 +- .../apache/pulsar/common/protocol/Commands.java | 3 +- 6 files changed, 169 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/protocol/ProducerBatchSendTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/protocol/ProducerBatchSendTest.java new file mode 100644 index 00000000000..552afbf085c --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/protocol/ProducerBatchSendTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.protocol; + +import static org.mockito.Mockito.doAnswer; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import static org.mockito.Mockito.spy; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.common.api.proto.BaseCommand; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test +public class ProducerBatchSendTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @DataProvider + public Object[][] flushSend() { + return new Object[][] { + {Collections.emptyList()}, + {Arrays.asList(1)}, + {Arrays.asList(2)}, + {Arrays.asList(3)}, + {Arrays.asList(1, 2)}, + {Arrays.asList(2, 3)}, + {Arrays.asList(1, 2, 3)}, + }; + } + + @Test(timeOut = 30_000, dataProvider = "flushSend") + public void testNoEnoughMemSend(List<Integer> flushSend) throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String subscription = "s1"; + admin.topics().createNonPartitionedTopic(topic); + admin.topics().createSubscription(topic, subscription, MessageId.earliest); + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(true) + .batchingMaxMessages(Integer.MAX_VALUE).batchingMaxPublishDelay(1, TimeUnit.HOURS).create(); + + /** + * The method {@link org.apache.pulsar.client.impl.BatchMessageContainerImpl#createOpSendMsg} may fail due to + * many errors, such like allocate more memory failed when calling + * {@link Commands#serializeCommandSendWithSize}. We mock an error here. + */ + AtomicBoolean failure = new AtomicBoolean(true); + BaseCommand threadLocalBaseCommand = Commands.LOCAL_BASE_COMMAND.get(); + BaseCommand spyBaseCommand = spy(threadLocalBaseCommand); + doAnswer(invocation -> { + if (failure.get()) { + throw new RuntimeException("mocked exception"); + } else { + return invocation.callRealMethod(); + } + }).when(spyBaseCommand).setSend(); + Commands.LOCAL_BASE_COMMAND.set(spyBaseCommand); + + // Failed sending 3 times. + producer.sendAsync("1"); + if (flushSend.contains(1)) { + producer.flushAsync(); + } + producer.sendAsync("2"); + if (flushSend.contains(2)) { + producer.flushAsync(); + } + producer.sendAsync("3"); + if (flushSend.contains(3)) { + producer.flushAsync(); + } + // Publishing is finished eventually. + failure.set(false); + producer.flush(); + Awaitility.await().untilAsserted(() -> { + assertTrue(admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog() > 0); + }); + + // Verify: all messages can be consumed. + ConsumerImpl<String> consumer = (ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING).topic(topic) + .subscriptionName(subscription).subscribe(); + Message<String> msg1 = consumer.receive(2, TimeUnit.SECONDS); + assertNotNull(msg1); + assertEquals(msg1.getValue(), "1"); + Message<String> msg2 = consumer.receive(2, TimeUnit.SECONDS); + assertNotNull(msg2); + assertEquals(msg2.getValue(), "2"); + Message<String> msg3 = consumer.receive(2, TimeUnit.SECONDS); + assertNotNull(msg3); + assertEquals(msg3.getValue(), "3"); + + // cleanup. + consumer.close(); + producer.close(); + admin.topics().delete(topic, false); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java index ddbe1bc2557..ee9e8262275 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java @@ -89,4 +89,9 @@ public interface BatchMessageContainerBase extends BatchMessageContainer { * @return the timestamp in nanoseconds or 0L if the batch container is empty */ long getFirstAddedTimestamp(); + + /** + * Clear the container's payload if build {@link OpSendMsg} failed. + */ + void resetPayloadAfterFailedPublishing(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index 9e0eeafc478..489a3752332 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -162,13 +162,11 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { } catch (Throwable th) { // serializing batch message can corrupt the index of message and batch-message. Reset the index so, // next iteration doesn't send corrupt message to broker. - for (int j = 0; j <= i; j++) { - MessageImpl<?> previousMsg = messages.get(j); - previousMsg.getDataBuffer().resetReaderIndex(); - } batchedMessageMetadataAndPayload.writerIndex(batchWriteIndex); batchedMessageMetadataAndPayload.readerIndex(batchReadIndex); throw new RuntimeException(th); + } finally { + msg.getDataBuffer().resetReaderIndex(); } } @@ -343,6 +341,14 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { return op; } + @Override + public void resetPayloadAfterFailedPublishing() { + if (batchedMessageMetadataAndPayload != null) { + batchedMessageMetadataAndPayload.readerIndex(0); + batchedMessageMetadataAndPayload.writerIndex(0); + } + } + protected void updateAndReserveBatchAllocatedSize(int updatedSizeBytes) { int delta = updatedSizeBytes - batchAllocatedSizeBytes; batchAllocatedSizeBytes = updatedSizeBytes; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java index 1592d3cae6c..f6e3f5a683e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java @@ -127,6 +127,13 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer { } } + @Override + public void resetPayloadAfterFailedPublishing() { + for (BatchMessageContainerImpl batch : batches.values()) { + batch.resetPayloadAfterFailedPublishing(); + } + } + @Override public boolean hasSameSchema(MessageImpl<?> msg) { String key = getKey(msg); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 759145f376d..2752dce945f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -2353,7 +2353,10 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne processOpSendMsg(opSendMsg); } } catch (Throwable t) { - log.warn("[{}] [{}] error while create opSendMsg by batch message container", topic, producerName, t); + // Since there is a uncompleted payload was built, we should reset it. + batchMessageContainer.resetPayloadAfterFailedPublishing(); + log.warn("[{}] [{}] Failed to create batch message for sending. Batch payloads have been reset and" + + " messages will be retried in subsequent batches.", topic, producerName, t); } finally { if (shouldScheduleNextBatchFlush) { maybeScheduleBatchFlushTask(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 95053e5e7e0..2cb4f9a40e3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -130,7 +130,8 @@ public class Commands { public static final short magicBrokerEntryMetadata = 0x0e02; private static final int checksumSize = 4; - private static final FastThreadLocal<BaseCommand> LOCAL_BASE_COMMAND = new FastThreadLocal<BaseCommand>() { + @VisibleForTesting + static final FastThreadLocal<BaseCommand> LOCAL_BASE_COMMAND = new FastThreadLocal<BaseCommand>() { @Override protected BaseCommand initialValue() throws Exception { return new BaseCommand();