This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c75018a6db7 [fix][client] Fix building broken batched message when 
publishing (#24061)
c75018a6db7 is described below

commit c75018a6db712493de33664d78ced31f4e0e7094
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Sat Mar 15 05:14:12 2025 +0800

    [fix][client] Fix building broken batched message when publishing (#24061)
---
 .../common/protocol/ProducerBatchSendTest.java     | 141 +++++++++++++++++++++
 .../client/impl/BatchMessageContainerBase.java     |   5 +
 .../client/impl/BatchMessageContainerImpl.java     |  14 +-
 .../client/impl/BatchMessageKeyBasedContainer.java |   7 +
 .../apache/pulsar/client/impl/ProducerImpl.java    |   5 +-
 .../apache/pulsar/common/protocol/Commands.java    |   3 +-
 6 files changed, 169 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/common/protocol/ProducerBatchSendTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/common/protocol/ProducerBatchSendTest.java
new file mode 100644
index 00000000000..552afbf085c
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/common/protocol/ProducerBatchSendTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.mockito.Mockito.spy;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test
+public class ProducerBatchSendTest extends ProducerConsumerBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @DataProvider
+    public Object[][] flushSend() {
+        return new Object[][] {
+                {Collections.emptyList()},
+                {Arrays.asList(1)},
+                {Arrays.asList(2)},
+                {Arrays.asList(3)},
+                {Arrays.asList(1, 2)},
+                {Arrays.asList(2, 3)},
+                {Arrays.asList(1, 2, 3)},
+        };
+    }
+
+    @Test(timeOut = 30_000, dataProvider = "flushSend")
+    public void testNoEnoughMemSend(List<Integer> flushSend) throws Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String subscription = "s1";
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topics().createSubscription(topic, subscription, 
MessageId.earliest);
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(true)
+                
.batchingMaxMessages(Integer.MAX_VALUE).batchingMaxPublishDelay(1, 
TimeUnit.HOURS).create();
+
+        /**
+         * The method {@link 
org.apache.pulsar.client.impl.BatchMessageContainerImpl#createOpSendMsg} may 
fail due to
+         * many errors, such like allocate more memory failed when calling
+         * {@link Commands#serializeCommandSendWithSize}. We mock an error 
here.
+         */
+        AtomicBoolean failure = new AtomicBoolean(true);
+        BaseCommand threadLocalBaseCommand = Commands.LOCAL_BASE_COMMAND.get();
+        BaseCommand spyBaseCommand = spy(threadLocalBaseCommand);
+        doAnswer(invocation -> {
+            if (failure.get()) {
+                throw new RuntimeException("mocked exception");
+            } else {
+                return invocation.callRealMethod();
+            }
+        }).when(spyBaseCommand).setSend();
+        Commands.LOCAL_BASE_COMMAND.set(spyBaseCommand);
+
+        // Failed sending 3 times.
+        producer.sendAsync("1");
+        if (flushSend.contains(1)) {
+            producer.flushAsync();
+        }
+        producer.sendAsync("2");
+        if (flushSend.contains(2)) {
+            producer.flushAsync();
+        }
+        producer.sendAsync("3");
+        if (flushSend.contains(3)) {
+            producer.flushAsync();
+        }
+        // Publishing is finished eventually.
+        failure.set(false);
+        producer.flush();
+        Awaitility.await().untilAsserted(() -> {
+            
assertTrue(admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog()
 > 0);
+        });
+
+        // Verify: all messages can be consumed.
+        ConsumerImpl<String> consumer = (ConsumerImpl<String>) 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                .subscriptionName(subscription).subscribe();
+        Message<String> msg1 = consumer.receive(2, TimeUnit.SECONDS);
+        assertNotNull(msg1);
+        assertEquals(msg1.getValue(), "1");
+        Message<String> msg2 = consumer.receive(2, TimeUnit.SECONDS);
+        assertNotNull(msg2);
+        assertEquals(msg2.getValue(), "2");
+        Message<String> msg3 = consumer.receive(2, TimeUnit.SECONDS);
+        assertNotNull(msg3);
+        assertEquals(msg3.getValue(), "3");
+
+        // cleanup.
+        consumer.close();
+        producer.close();
+        admin.topics().delete(topic, false);
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java
index ddbe1bc2557..ee9e8262275 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java
@@ -89,4 +89,9 @@ public interface BatchMessageContainerBase extends 
BatchMessageContainer {
      * @return the timestamp in nanoseconds or 0L if the batch container is 
empty
      */
     long getFirstAddedTimestamp();
+
+    /**
+     * Clear the container's payload if build {@link OpSendMsg} failed.
+     */
+    void resetPayloadAfterFailedPublishing();
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 9e0eeafc478..489a3752332 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -162,13 +162,11 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
             } catch (Throwable th) {
                 // serializing batch message can corrupt the index of message 
and batch-message. Reset the index so,
                 // next iteration doesn't send corrupt message to broker.
-                for (int j = 0; j <= i; j++) {
-                    MessageImpl<?> previousMsg = messages.get(j);
-                    previousMsg.getDataBuffer().resetReaderIndex();
-                }
                 batchedMessageMetadataAndPayload.writerIndex(batchWriteIndex);
                 batchedMessageMetadataAndPayload.readerIndex(batchReadIndex);
                 throw new RuntimeException(th);
+            } finally {
+                msg.getDataBuffer().resetReaderIndex();
             }
         }
 
@@ -343,6 +341,14 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
         return op;
     }
 
+    @Override
+    public void resetPayloadAfterFailedPublishing() {
+        if (batchedMessageMetadataAndPayload != null) {
+            batchedMessageMetadataAndPayload.readerIndex(0);
+            batchedMessageMetadataAndPayload.writerIndex(0);
+        }
+    }
+
     protected void updateAndReserveBatchAllocatedSize(int updatedSizeBytes) {
         int delta = updatedSizeBytes - batchAllocatedSizeBytes;
         batchAllocatedSizeBytes = updatedSizeBytes;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
index 1592d3cae6c..f6e3f5a683e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
@@ -127,6 +127,13 @@ class BatchMessageKeyBasedContainer extends 
AbstractBatchMessageContainer {
         }
     }
 
+    @Override
+    public void resetPayloadAfterFailedPublishing() {
+        for (BatchMessageContainerImpl batch : batches.values()) {
+            batch.resetPayloadAfterFailedPublishing();
+        }
+    }
+
     @Override
     public boolean hasSameSchema(MessageImpl<?> msg) {
         String key = getKey(msg);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 759145f376d..2752dce945f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -2353,7 +2353,10 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                     processOpSendMsg(opSendMsg);
                 }
             } catch (Throwable t) {
-                log.warn("[{}] [{}] error while create opSendMsg by batch 
message container", topic, producerName, t);
+                // Since there is a uncompleted payload was built, we should 
reset it.
+                batchMessageContainer.resetPayloadAfterFailedPublishing();
+                log.warn("[{}] [{}] Failed to create batch message for 
sending. Batch payloads have been reset and"
+                                + " messages will be retried in subsequent 
batches.", topic, producerName, t);
             } finally {
                 if (shouldScheduleNextBatchFlush) {
                     maybeScheduleBatchFlushTask();
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 95053e5e7e0..2cb4f9a40e3 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -130,7 +130,8 @@ public class Commands {
     public static final short magicBrokerEntryMetadata = 0x0e02;
     private static final int checksumSize = 4;
 
-    private static final FastThreadLocal<BaseCommand> LOCAL_BASE_COMMAND = new 
FastThreadLocal<BaseCommand>() {
+    @VisibleForTesting
+    static final FastThreadLocal<BaseCommand> LOCAL_BASE_COMMAND = new 
FastThreadLocal<BaseCommand>() {
         @Override
         protected BaseCommand initialValue() throws Exception {
             return new BaseCommand();

Reply via email to