fmiguelez opened a new issue #7407:
URL: https://github.com/apache/pulsar/issues/7407
**Describe the bug**
The solution provided by #7139 to the BUG #4803 *does not work*.
* When trying to read a message with `null` value a `NullPointerException`
is thrown in other part of the code.
```
msg = consumer.receive(timeoutMillis, TimeUnit.MILLISECONDS));
java.lang.NullPointerException
at
org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl.updateNumMsgsReceived(ConsumerStatsRecorderImpl.java:169)
at
org.apache.pulsar.client.impl.ConsumerImpl.messageProcessed(ConsumerImpl.java:1423)
at
org.apache.pulsar.client.impl.ConsumerImpl.internalReceive(ConsumerImpl.java:431)
at
org.apache.pulsar.client.impl.ConsumerBase.receive(ConsumerBase.java:175)
...
```
* It should not be required to explicitly indicate a null value to producer
(only-key values should work just just fine). Exception thrown when working
with implicit null value messages is EOFException in this case (the same before
this a solution was provided).
**To Reproduce**
I have created a test project to reproduce these issues (`null` values
implicitly and explictly set with both schema and schemaless consumer):
[pulsar-tombstone-test](https://github.com/fmiguelez/pulsar-tombstone-test)
Read README.md to reproduce it.
**Expected behavior**
Tombstones (`null` values in mesages with our without schema but with key)
should be supported whether you indicate an schema or not and whether you
explicitly indicate a `null` value or not (implicit `null` value). All tests
should pass in the example project.
```
Producer<byte []> schemalessProducer =
client.newProducer(Schema.BYTES).topic(TOPIC).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
Consumer<byte []> schemalessConsumer =
client.newConsumer(Schema.BYTES).topic(TOPIC).subscriptionName("test")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
// Implicit tombstone without schema
schemalessProducer.key("1").send();
// Explicit tombstone (the one supposedly to work)
schemalessProducer.key("2").value(null).send();
Message<byte[]> implicitTombstone = schemalessReceiver.receive(2,
TimeUnit.SECONDS).getValue());
Message<byte[]> explictitTombstone = schemalessReceiver.receive(2,
TimeUnit.SECONDS).getValue())
System.out.println(String.format("Implicit tombstone: {key=%s, value=%s}",
implicitTombstone.getKey(), implicitTombstone.getValue()));
System.out.println(String.format("Explicit tombstone: {key=%s, value=%s}",
implicitTombstone.getKey(), implicitTombstone.getValue()));
```
```
Producer<DummyObject> schemaProducer =
client.newProducer(Schema.AVRO(DummyObject.class)).topic(TOPIC).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
Consumer<DummyObject> schemaConsumer =
client.newConsumer(Schema.AVRO(DummyObject.class)).topic(TOPIC).subscriptionName("test")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
// Implicit tombstone without schema
schemaProducer.key("1").send();
// Explicit tombstone (the one supposedly to work)
schemaProducer.key("2").value(null).send();
Message<DummyObject> implicitTombstone = schemalessReceiver.receive(2,
TimeUnit.SECONDS).getValue());
Message<DummyObject> explictitTombstone = schemalessReceiver.receive(2,
TimeUnit.SECONDS).getValue())
System.out.println(String.format("Implicit tombstone: {key=%s, value=%s}",
implicitTombstone.getKey(), implicitTombstone.getValue()));
System.out.println(String.format("Explicit tombstone: {key=%s, value=%s}",
implicitTombstone.getKey(), implicitTombstone.getValue()));
```
**Screenshots**
**Desktop (please complete the following information):**
- Windows 10 with Docker Deskto to run Pulsar containers
**Additional context**
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]