This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a3e7d763d2aa4e0ca72858189ccd20ec82da222d Author: Xiangying Meng <[email protected]> AuthorDate: Sat Aug 3 15:30:22 2024 +0800 [fix][client] the nullValue in msgMetadata should be true by default (#22372) Co-authored-by: xiangying <[email protected]> ### Motivation When a message is not set value, the `nullValue` message metadata should be true and change to false after the value is set. Otherwise, the message data will be set as a [] when the value is not set, that would cause the message data to be encoded and throw a `SchemaSerializationException` when calling `reconsumerLater`. ``` org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4 at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1131) at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:467) at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:452) at org.apache.pulsar.client.api.ConsumerRedeliveryTest.testRedeliverMessagesWithoutValue(ConsumerRedeliveryTest.java:445) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139) at org.testng.internal.invokers.TestInvoker.invokeMethod(TestInvoker.java:677) at org.testng.internal.invokers.TestInvoker.invokeTestMethod(TestInvoker.java:221) at org.testng.internal.invokers.MethodRunner.runInSequence(MethodRunner.java:50) at org.testng.internal.invokers.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:969) at org.testng.internal.invokers.TestInvoker.invokeTestMethods(TestInvoker.java:194) at org.testng.internal.invokers.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:148) at org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128) at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) at org.testng.TestRunner.privateRun(TestRunner.java:829) at org.testng.TestRunner.run(TestRunner.java:602) at org.testng.SuiteRunner.runTest(SuiteRunner.java:437) at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:431) at org.testng.SuiteRunner.privateRun(SuiteRunner.java:391) at org.testng.SuiteRunner.run(SuiteRunner.java:330) at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52) at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:95) at org.testng.TestNG.runSuitesSequentially(TestNG.java:1256) at org.testng.TestNG.runSuitesLocally(TestNG.java:1176) at org.testng.TestNG.runSuites(TestNG.java:1099) at org.testng.TestNG.run(TestNG.java:1067) at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:65) at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:105) Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4 at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:462) ... 29 more Caused by: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4 at org.apache.pulsar.client.impl.schema.IntSchema.validate(IntSchema.java:49) at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:80) at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:32) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.lambda$value$3(TypedMessageBuilderImpl.java:157) at java.base/java.util.Optional.orElseGet(Optional.java:364) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:156) at org.apache.pulsar.client.impl.ConsumerImpl.doReconsumeLater(ConsumerImpl.java:689) at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.doReconsumeLater(MultiTopicsConsumerImpl.java:550) at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLaterAsync(ConsumerBase.java:574) ``` ### Modifications When a message is not set value, the `nullValue` message metadata should be true and change to false after the value is set. (cherry picked from commit f3c177e2243e26a7849feb91dbed9fec4c5723c0) --- .../pulsar/client/api/ConsumerRedeliveryTest.java | 24 +++++++++++++++ .../pulsar/client/impl/MessageChecksumTest.java | 5 ++++ .../apache/pulsar/compaction/CompactionTest.java | 2 +- .../client/impl/TypedMessageBuilderImpl.java | 35 ++++++++++++---------- .../client/impl/TypedMessageBuilderImplTest.java | 17 +++++++++-- 5 files changed, 63 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java index 90114add250..fcf1a638d58 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java @@ -424,4 +424,28 @@ public class ConsumerRedeliveryTest extends ProducerConsumerBase { assertTrue(values.isEmpty()); } } + + @Test + public void testRedeliverMessagesWithoutValue() throws Exception { + String topic = "persistent://my-property/my-ns/testRedeliverMessagesWithoutValue"; + @Cleanup Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .enableRetry(true) + .subscribe(); + @Cleanup Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .enableBatching(true) + .create(); + for (int i = 0; i < 10; i++) { + producer.newMessage().key("messages without value").send(); + } + + Message<Integer> message = consumer.receive(); + consumer.reconsumeLater(message, 2, TimeUnit.SECONDS); + for (int i = 0; i < 9; i++) { + assertNotNull(consumer.receive(5, TimeUnit.SECONDS)); + } + assertTrue(consumer.receive(5, TimeUnit.SECONDS).getTopicName().contains("sub-RETRY")); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java index 0b25e340956..94e76384750 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java @@ -24,6 +24,8 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; + +import java.lang.reflect.Method; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -225,6 +227,9 @@ public class MessageChecksumTest extends BrokerTestBase { .create(); TypedMessageBuilderImpl<byte[]> msgBuilder = (TypedMessageBuilderImpl<byte[]>) producer.newMessage() .value("a message".getBytes()); + Method method = TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend"); + method.setAccessible(true); + method.invoke(msgBuilder); MessageMetadata msgMetadata = msgBuilder.getMetadataBuilder() .setProducerName("test") .setSequenceId(1) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 081831b0300..0cf32859e3d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -1384,7 +1384,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { Message<byte[]> message4 = consumer.receive(); Assert.assertEquals(message4.getKey(), "key2"); - Assert.assertEquals(new String(message4.getData()), ""); + assertNull(message4.getData()); Message<byte[]> message5 = consumer.receive(); Assert.assertEquals(message5.getKey(), "key4"); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java index 026f8a1e69e..d90c2e88283 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java @@ -50,6 +50,7 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> { private final transient Schema<T> schema; private transient ByteBuffer content; private final transient TransactionImpl txn; + private transient T value; public TypedMessageBuilderImpl(ProducerBase<?> producer, Schema<T> schema) { this(producer, schema, null); @@ -65,6 +66,22 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> { } private long beforeSend() { + if (value == null) { + msgMetadata.setNullValue(true); + } else { + getKeyValueSchema().map(keyValueSchema -> { + if (keyValueSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) { + setSeparateKeyValue(value, keyValueSchema); + return this; + } else { + return null; + } + }).orElseGet(() -> { + content = ByteBuffer.wrap(schema.encode(value)); + return this; + }); + } + if (txn == null) { return -1L; } @@ -140,22 +157,8 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> { @Override public TypedMessageBuilder<T> value(T value) { - if (value == null) { - msgMetadata.setNullValue(true); - return this; - } - - return getKeyValueSchema().map(keyValueSchema -> { - if (keyValueSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) { - setSeparateKeyValue(value, keyValueSchema); - return this; - } else { - return null; - } - }).orElseGet(() -> { - content = ByteBuffer.wrap(schema.encode(value)); - return this; - }); + this.value = value; + return this; } @Override diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java index 94c683e5271..05db4402a15 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java @@ -27,6 +27,8 @@ import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.mockito.Mock; import org.testng.annotations.Test; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.Base64; @@ -45,7 +47,7 @@ public class TypedMessageBuilderImplTest { protected ProducerBase producerBase; @Test - public void testDefaultValue() { + public void testDefaultValue() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { producerBase = mock(ProducerBase.class); AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build()); @@ -63,6 +65,9 @@ public class TypedMessageBuilderImplTest { // Check kv.encoding.type default, not set value TypedMessageBuilderImpl<KeyValue> typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue); + Method method = TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend"); + method.setAccessible(true); + method.invoke(typedMessageBuilder); ByteBuffer content = typedMessageBuilder.getContent(); byte[] contentByte = new byte[content.remaining()]; content.get(contentByte); @@ -73,7 +78,7 @@ public class TypedMessageBuilderImplTest { } @Test - public void testInlineValue() { + public void testInlineValue() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { producerBase = mock(ProducerBase.class); AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build()); @@ -91,6 +96,9 @@ public class TypedMessageBuilderImplTest { // Check kv.encoding.type INLINE TypedMessageBuilderImpl<KeyValue> typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue); + Method method = TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend"); + method.setAccessible(true); + method.invoke(typedMessageBuilder); ByteBuffer content = typedMessageBuilder.getContent(); byte[] contentByte = new byte[content.remaining()]; content.get(contentByte); @@ -101,7 +109,7 @@ public class TypedMessageBuilderImplTest { } @Test - public void testSeparatedValue() { + public void testSeparatedValue() throws Exception { producerBase = mock(ProducerBase.class); AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build()); @@ -119,6 +127,9 @@ public class TypedMessageBuilderImplTest { // Check kv.encoding.type SEPARATED TypedMessageBuilderImpl typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue); + Method method = TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend"); + method.setAccessible(true); + method.invoke(typedMessageBuilder); ByteBuffer content = typedMessageBuilder.getContent(); byte[] contentByte = new byte[content.remaining()]; content.get(contentByte);
