This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 054117603cb [fix][client] the nullValue in msgMetadata should be true
by default (#22372)
054117603cb is described below
commit 054117603cb6e5624fa6fb5d1a2a50f33b9b9fae
Author: Xiangying Meng <[email protected]>
AuthorDate: Sat Aug 3 15:30:22 2024 +0800
[fix][client] the nullValue in msgMetadata should be true by default
(#22372)
Co-authored-by: xiangying <[email protected]>
### Motivation
When a message is not set value, the `nullValue` message metadata should be
true and change to false after the value is set. Otherwise, the message data
will be set as a [] when the value is not set, that would cause the message
data to be encoded and throw a `SchemaSerializationException` when calling
`reconsumerLater`.
```
org.apache.pulsar.client.api.PulsarClientException:
java.util.concurrent.ExecutionException:
org.apache.pulsar.client.api.SchemaSerializationException: Size of data
received by IntSchema is not 4
at
org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1131)
at
org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:467)
at
org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:452)
at
org.apache.pulsar.client.api.ConsumerRedeliveryTest.testRedeliverMessagesWithoutValue(ConsumerRedeliveryTest.java:445)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at
org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
at
org.testng.internal.invokers.TestInvoker.invokeMethod(TestInvoker.java:677)
at
org.testng.internal.invokers.TestInvoker.invokeTestMethod(TestInvoker.java:221)
at
org.testng.internal.invokers.MethodRunner.runInSequence(MethodRunner.java:50)
at
org.testng.internal.invokers.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:969)
at
org.testng.internal.invokers.TestInvoker.invokeTestMethods(TestInvoker.java:194)
at
org.testng.internal.invokers.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:148)
at
org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.testng.TestRunner.privateRun(TestRunner.java:829)
at org.testng.TestRunner.run(TestRunner.java:602)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:437)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:431)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:391)
at org.testng.SuiteRunner.run(SuiteRunner.java:330)
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:95)
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1256)
at org.testng.TestNG.runSuitesLocally(TestNG.java:1176)
at org.testng.TestNG.runSuites(TestNG.java:1099)
at org.testng.TestNG.run(TestNG.java:1067)
at
com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:65)
at
com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:105)
Caused by: java.util.concurrent.ExecutionException:
org.apache.pulsar.client.api.SchemaSerializationException: Size of data
received by IntSchema is not 4
at
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
at
org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:462)
... 29 more
Caused by: org.apache.pulsar.client.api.SchemaSerializationException: Size
of data received by IntSchema is not 4
at
org.apache.pulsar.client.impl.schema.IntSchema.validate(IntSchema.java:49)
at
org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:80)
at
org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:32)
at
org.apache.pulsar.client.impl.TypedMessageBuilderImpl.lambda$value$3(TypedMessageBuilderImpl.java:157)
at java.base/java.util.Optional.orElseGet(Optional.java:364)
at
org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:156)
at
org.apache.pulsar.client.impl.ConsumerImpl.doReconsumeLater(ConsumerImpl.java:689)
at
org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.doReconsumeLater(MultiTopicsConsumerImpl.java:550)
at
org.apache.pulsar.client.impl.ConsumerBase.reconsumeLaterAsync(ConsumerBase.java:574)
```
### Modifications
When a message is not set value, the `nullValue` message metadata should be
true and change to false after the value is set.
(cherry picked from commit f3c177e2243e26a7849feb91dbed9fec4c5723c0)
---
.../pulsar/client/api/ConsumerRedeliveryTest.java | 24 +++++++++++++++
.../pulsar/client/impl/MessageChecksumTest.java | 5 ++++
.../apache/pulsar/compaction/CompactionTest.java | 2 +-
.../client/impl/TypedMessageBuilderImpl.java | 35 ++++++++++++----------
.../client/impl/TypedMessageBuilderImplTest.java | 17 +++++++++--
5 files changed, 63 insertions(+), 20 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
index 90114add250..fcf1a638d58 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
@@ -424,4 +424,28 @@ public class ConsumerRedeliveryTest extends
ProducerConsumerBase {
assertTrue(values.isEmpty());
}
}
+
+ @Test
+ public void testRedeliverMessagesWithoutValue() throws Exception {
+ String topic =
"persistent://my-property/my-ns/testRedeliverMessagesWithoutValue";
+ @Cleanup Consumer<Integer> consumer =
pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName("sub")
+ .enableRetry(true)
+ .subscribe();
+ @Cleanup Producer<Integer> producer =
pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .enableBatching(true)
+ .create();
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage().key("messages without value").send();
+ }
+
+ Message<Integer> message = consumer.receive();
+ consumer.reconsumeLater(message, 2, TimeUnit.SECONDS);
+ for (int i = 0; i < 9; i++) {
+ assertNotNull(consumer.receive(5, TimeUnit.SECONDS));
+ }
+ assertTrue(consumer.receive(5,
TimeUnit.SECONDS).getTopicName().contains("sub-RETRY"));
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java
index 515b34db850..f84c840fc3b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java
@@ -24,6 +24,8 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+
+import java.lang.reflect.Method;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -224,6 +226,9 @@ public class MessageChecksumTest extends BrokerTestBase {
.create();
TypedMessageBuilderImpl<byte[]> msgBuilder =
(TypedMessageBuilderImpl<byte[]>) producer.newMessage()
.value("a message".getBytes());
+ Method method =
TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend");
+ method.setAccessible(true);
+ method.invoke(msgBuilder);
MessageMetadata msgMetadata = msgBuilder.getMetadataBuilder()
.setProducerName("test")
.setSequenceId(1)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 9b67f45a1c0..dc33a02621b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -1387,7 +1387,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
Message<byte[]> message4 = consumer.receive();
Assert.assertEquals(message4.getKey(), "key2");
- Assert.assertEquals(new String(message4.getData()), "");
+ assertNull(message4.getData());
Message<byte[]> message5 = consumer.receive();
Assert.assertEquals(message5.getKey(), "key4");
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index 026f8a1e69e..d90c2e88283 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -50,6 +50,7 @@ public class TypedMessageBuilderImpl<T> implements
TypedMessageBuilder<T> {
private final transient Schema<T> schema;
private transient ByteBuffer content;
private final transient TransactionImpl txn;
+ private transient T value;
public TypedMessageBuilderImpl(ProducerBase<?> producer, Schema<T> schema)
{
this(producer, schema, null);
@@ -65,6 +66,22 @@ public class TypedMessageBuilderImpl<T> implements
TypedMessageBuilder<T> {
}
private long beforeSend() {
+ if (value == null) {
+ msgMetadata.setNullValue(true);
+ } else {
+ getKeyValueSchema().map(keyValueSchema -> {
+ if (keyValueSchema.getKeyValueEncodingType() ==
KeyValueEncodingType.SEPARATED) {
+ setSeparateKeyValue(value, keyValueSchema);
+ return this;
+ } else {
+ return null;
+ }
+ }).orElseGet(() -> {
+ content = ByteBuffer.wrap(schema.encode(value));
+ return this;
+ });
+ }
+
if (txn == null) {
return -1L;
}
@@ -140,22 +157,8 @@ public class TypedMessageBuilderImpl<T> implements
TypedMessageBuilder<T> {
@Override
public TypedMessageBuilder<T> value(T value) {
- if (value == null) {
- msgMetadata.setNullValue(true);
- return this;
- }
-
- return getKeyValueSchema().map(keyValueSchema -> {
- if (keyValueSchema.getKeyValueEncodingType() ==
KeyValueEncodingType.SEPARATED) {
- setSeparateKeyValue(value, keyValueSchema);
- return this;
- } else {
- return null;
- }
- }).orElseGet(() -> {
- content = ByteBuffer.wrap(schema.encode(value));
- return this;
- });
+ this.value = value;
+ return this;
}
@Override
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java
index 94c683e5271..05db4402a15 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java
@@ -27,6 +27,8 @@ import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.mockito.Mock;
import org.testng.annotations.Test;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.Base64;
@@ -45,7 +47,7 @@ public class TypedMessageBuilderImplTest {
protected ProducerBase producerBase;
@Test
- public void testDefaultValue() {
+ public void testDefaultValue() throws NoSuchMethodException,
InvocationTargetException, IllegalAccessException {
producerBase = mock(ProducerBase.class);
AvroSchema<SchemaTestUtils.Foo> fooSchema =
AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
@@ -63,6 +65,9 @@ public class TypedMessageBuilderImplTest {
// Check kv.encoding.type default, not set value
TypedMessageBuilderImpl<KeyValue> typedMessageBuilder =
(TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue);
+ Method method =
TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend");
+ method.setAccessible(true);
+ method.invoke(typedMessageBuilder);
ByteBuffer content = typedMessageBuilder.getContent();
byte[] contentByte = new byte[content.remaining()];
content.get(contentByte);
@@ -73,7 +78,7 @@ public class TypedMessageBuilderImplTest {
}
@Test
- public void testInlineValue() {
+ public void testInlineValue() throws NoSuchMethodException,
InvocationTargetException, IllegalAccessException {
producerBase = mock(ProducerBase.class);
AvroSchema<SchemaTestUtils.Foo> fooSchema =
AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
@@ -91,6 +96,9 @@ public class TypedMessageBuilderImplTest {
// Check kv.encoding.type INLINE
TypedMessageBuilderImpl<KeyValue> typedMessageBuilder =
(TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue);
+ Method method =
TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend");
+ method.setAccessible(true);
+ method.invoke(typedMessageBuilder);
ByteBuffer content = typedMessageBuilder.getContent();
byte[] contentByte = new byte[content.remaining()];
content.get(contentByte);
@@ -101,7 +109,7 @@ public class TypedMessageBuilderImplTest {
}
@Test
- public void testSeparatedValue() {
+ public void testSeparatedValue() throws Exception {
producerBase = mock(ProducerBase.class);
AvroSchema<SchemaTestUtils.Foo> fooSchema =
AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
@@ -119,6 +127,9 @@ public class TypedMessageBuilderImplTest {
// Check kv.encoding.type SEPARATED
TypedMessageBuilderImpl typedMessageBuilder =
(TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue);
+ Method method =
TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend");
+ method.setAccessible(true);
+ method.invoke(typedMessageBuilder);
ByteBuffer content = typedMessageBuilder.getContent();
byte[] contentByte = new byte[content.remaining()];
content.get(contentByte);