This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 054117603cb [fix][client] the nullValue in msgMetadata should be true 
by default (#22372)
054117603cb is described below

commit 054117603cb6e5624fa6fb5d1a2a50f33b9b9fae
Author: Xiangying Meng <[email protected]>
AuthorDate: Sat Aug 3 15:30:22 2024 +0800

    [fix][client] the nullValue in msgMetadata should be true by default 
(#22372)
    
    Co-authored-by: xiangying <[email protected]>
    ### Motivation
    When a message is not set value, the `nullValue` message metadata should be 
true and change to false after the value is set. Otherwise, the message data 
will be set as a [] when the value is not set, that would cause the message 
data to be encoded and throw a `SchemaSerializationException` when calling 
`reconsumerLater`.
    ```
    
    org.apache.pulsar.client.api.PulsarClientException: 
java.util.concurrent.ExecutionException: 
org.apache.pulsar.client.api.SchemaSerializationException: Size of data 
received by IntSchema is not 4
    
            at 
org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1131)
            at 
org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:467)
            at 
org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:452)
            at 
org.apache.pulsar.client.api.ConsumerRedeliveryTest.testRedeliverMessagesWithoutValue(ConsumerRedeliveryTest.java:445)
            at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
            at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.base/java.lang.reflect.Method.invoke(Method.java:568)
            at 
org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
            at 
org.testng.internal.invokers.TestInvoker.invokeMethod(TestInvoker.java:677)
            at 
org.testng.internal.invokers.TestInvoker.invokeTestMethod(TestInvoker.java:221)
            at 
org.testng.internal.invokers.MethodRunner.runInSequence(MethodRunner.java:50)
            at 
org.testng.internal.invokers.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:969)
            at 
org.testng.internal.invokers.TestInvoker.invokeTestMethods(TestInvoker.java:194)
            at 
org.testng.internal.invokers.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:148)
            at 
org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128)
            at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
            at org.testng.TestRunner.privateRun(TestRunner.java:829)
            at org.testng.TestRunner.run(TestRunner.java:602)
            at org.testng.SuiteRunner.runTest(SuiteRunner.java:437)
            at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:431)
            at org.testng.SuiteRunner.privateRun(SuiteRunner.java:391)
            at org.testng.SuiteRunner.run(SuiteRunner.java:330)
            at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
            at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:95)
            at org.testng.TestNG.runSuitesSequentially(TestNG.java:1256)
            at org.testng.TestNG.runSuitesLocally(TestNG.java:1176)
            at org.testng.TestNG.runSuites(TestNG.java:1099)
            at org.testng.TestNG.run(TestNG.java:1067)
            at 
com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:65)
            at 
com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:105)
    Caused by: java.util.concurrent.ExecutionException: 
org.apache.pulsar.client.api.SchemaSerializationException: Size of data 
received by IntSchema is not 4
            at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
            at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
            at 
org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:462)
            ... 29 more
    Caused by: org.apache.pulsar.client.api.SchemaSerializationException: Size 
of data received by IntSchema is not 4
            at 
org.apache.pulsar.client.impl.schema.IntSchema.validate(IntSchema.java:49)
            at 
org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:80)
            at 
org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:32)
            at 
org.apache.pulsar.client.impl.TypedMessageBuilderImpl.lambda$value$3(TypedMessageBuilderImpl.java:157)
            at java.base/java.util.Optional.orElseGet(Optional.java:364)
            at 
org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:156)
            at 
org.apache.pulsar.client.impl.ConsumerImpl.doReconsumeLater(ConsumerImpl.java:689)
            at 
org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.doReconsumeLater(MultiTopicsConsumerImpl.java:550)
            at 
org.apache.pulsar.client.impl.ConsumerBase.reconsumeLaterAsync(ConsumerBase.java:574)
    ```
    ### Modifications
    When a message is not set value, the `nullValue` message metadata should be 
true and change to false after the value is set.
    
    (cherry picked from commit f3c177e2243e26a7849feb91dbed9fec4c5723c0)
---
 .../pulsar/client/api/ConsumerRedeliveryTest.java  | 24 +++++++++++++++
 .../pulsar/client/impl/MessageChecksumTest.java    |  5 ++++
 .../apache/pulsar/compaction/CompactionTest.java   |  2 +-
 .../client/impl/TypedMessageBuilderImpl.java       | 35 ++++++++++++----------
 .../client/impl/TypedMessageBuilderImplTest.java   | 17 +++++++++--
 5 files changed, 63 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
index 90114add250..fcf1a638d58 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
@@ -424,4 +424,28 @@ public class ConsumerRedeliveryTest extends 
ProducerConsumerBase {
             assertTrue(values.isEmpty());
         }
     }
+
+    @Test
+    public void testRedeliverMessagesWithoutValue() throws Exception {
+        String topic = 
"persistent://my-property/my-ns/testRedeliverMessagesWithoutValue";
+        @Cleanup Consumer<Integer> consumer = 
pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName("sub")
+                .enableRetry(true)
+                .subscribe();
+        @Cleanup Producer<Integer> producer = 
pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .enableBatching(true)
+                .create();
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage().key("messages without value").send();
+        }
+
+        Message<Integer> message = consumer.receive();
+        consumer.reconsumeLater(message, 2, TimeUnit.SECONDS);
+        for (int i = 0; i < 9; i++) {
+            assertNotNull(consumer.receive(5, TimeUnit.SECONDS));
+        }
+        assertTrue(consumer.receive(5, 
TimeUnit.SECONDS).getTopicName().contains("sub-RETRY"));
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java
index 515b34db850..f84c840fc3b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java
@@ -24,6 +24,8 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+
+import java.lang.reflect.Method;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -224,6 +226,9 @@ public class MessageChecksumTest extends BrokerTestBase {
                 .create();
         TypedMessageBuilderImpl<byte[]> msgBuilder = 
(TypedMessageBuilderImpl<byte[]>) producer.newMessage()
                 .value("a message".getBytes());
+        Method method = 
TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend");
+        method.setAccessible(true);
+        method.invoke(msgBuilder);
         MessageMetadata msgMetadata = msgBuilder.getMetadataBuilder()
                 .setProducerName("test")
                 .setSequenceId(1)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 9b67f45a1c0..dc33a02621b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -1387,7 +1387,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
                 Message<byte[]> message4 = consumer.receive();
                 Assert.assertEquals(message4.getKey(), "key2");
-                Assert.assertEquals(new String(message4.getData()), "");
+                assertNull(message4.getData());
 
                 Message<byte[]> message5 = consumer.receive();
                 Assert.assertEquals(message5.getKey(), "key4");
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index 026f8a1e69e..d90c2e88283 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -50,6 +50,7 @@ public class TypedMessageBuilderImpl<T> implements 
TypedMessageBuilder<T> {
     private final transient Schema<T> schema;
     private transient ByteBuffer content;
     private final transient TransactionImpl txn;
+    private transient T value;
 
     public TypedMessageBuilderImpl(ProducerBase<?> producer, Schema<T> schema) 
{
         this(producer, schema, null);
@@ -65,6 +66,22 @@ public class TypedMessageBuilderImpl<T> implements 
TypedMessageBuilder<T> {
     }
 
     private long beforeSend() {
+        if (value == null) {
+            msgMetadata.setNullValue(true);
+        } else {
+            getKeyValueSchema().map(keyValueSchema -> {
+                if (keyValueSchema.getKeyValueEncodingType() == 
KeyValueEncodingType.SEPARATED) {
+                    setSeparateKeyValue(value, keyValueSchema);
+                    return this;
+                } else {
+                    return null;
+                }
+            }).orElseGet(() -> {
+                content = ByteBuffer.wrap(schema.encode(value));
+                return this;
+            });
+        }
+
         if (txn == null) {
             return -1L;
         }
@@ -140,22 +157,8 @@ public class TypedMessageBuilderImpl<T> implements 
TypedMessageBuilder<T> {
 
     @Override
     public TypedMessageBuilder<T> value(T value) {
-        if (value == null) {
-            msgMetadata.setNullValue(true);
-            return this;
-        }
-
-        return getKeyValueSchema().map(keyValueSchema -> {
-            if (keyValueSchema.getKeyValueEncodingType() == 
KeyValueEncodingType.SEPARATED) {
-                setSeparateKeyValue(value, keyValueSchema);
-                return this;
-            } else {
-                return null;
-            }
-        }).orElseGet(() -> {
-            content = ByteBuffer.wrap(schema.encode(value));
-            return this;
-        });
+        this.value = value;
+        return this;
     }
 
     @Override
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java
index 94c683e5271..05db4402a15 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java
@@ -27,6 +27,8 @@ import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.mockito.Mock;
 import org.testng.annotations.Test;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.util.Base64;
 
@@ -45,7 +47,7 @@ public class TypedMessageBuilderImplTest {
     protected ProducerBase producerBase;
 
     @Test
-    public void testDefaultValue() {
+    public void testDefaultValue() throws NoSuchMethodException, 
InvocationTargetException, IllegalAccessException {
         producerBase = mock(ProducerBase.class);
 
         AvroSchema<SchemaTestUtils.Foo> fooSchema = 
AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
@@ -63,6 +65,9 @@ public class TypedMessageBuilderImplTest {
 
         // Check kv.encoding.type default, not set value
         TypedMessageBuilderImpl<KeyValue>  typedMessageBuilder = 
(TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue);
+        Method method = 
TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend");
+        method.setAccessible(true);
+        method.invoke(typedMessageBuilder);
         ByteBuffer content = typedMessageBuilder.getContent();
         byte[] contentByte = new byte[content.remaining()];
         content.get(contentByte);
@@ -73,7 +78,7 @@ public class TypedMessageBuilderImplTest {
     }
 
     @Test
-    public void testInlineValue() {
+    public void testInlineValue() throws NoSuchMethodException, 
InvocationTargetException, IllegalAccessException {
         producerBase = mock(ProducerBase.class);
 
         AvroSchema<SchemaTestUtils.Foo> fooSchema = 
AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
@@ -91,6 +96,9 @@ public class TypedMessageBuilderImplTest {
 
         // Check kv.encoding.type INLINE
         TypedMessageBuilderImpl<KeyValue> typedMessageBuilder = 
(TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue);
+        Method method = 
TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend");
+        method.setAccessible(true);
+        method.invoke(typedMessageBuilder);
         ByteBuffer content = typedMessageBuilder.getContent();
         byte[] contentByte = new byte[content.remaining()];
         content.get(contentByte);
@@ -101,7 +109,7 @@ public class TypedMessageBuilderImplTest {
     }
 
     @Test
-    public void testSeparatedValue() {
+    public void testSeparatedValue() throws Exception {
         producerBase = mock(ProducerBase.class);
 
         AvroSchema<SchemaTestUtils.Foo> fooSchema = 
AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
@@ -119,6 +127,9 @@ public class TypedMessageBuilderImplTest {
 
         // Check kv.encoding.type SEPARATED
         TypedMessageBuilderImpl typedMessageBuilder = 
(TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue);
+        Method method = 
TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend");
+        method.setAccessible(true);
+        method.invoke(typedMessageBuilder);
         ByteBuffer content = typedMessageBuilder.getContent();
         byte[] contentByte = new byte[content.remaining()];
         content.get(contentByte);

Reply via email to