fmiguelez opened a new issue #7407:
URL: https://github.com/apache/pulsar/issues/7407


   **Describe the bug**
   
   The solution provided by #7139 to the BUG #4803 *does not work*.
   
   * When trying to read a message with `null` value a `NullPointerException` 
is thrown in other part of the code.
   ```
           msg = consumer.receive(timeoutMillis, TimeUnit.MILLISECONDS));
   
        java.lang.NullPointerException
                at 
org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl.updateNumMsgsReceived(ConsumerStatsRecorderImpl.java:169)
                at 
org.apache.pulsar.client.impl.ConsumerImpl.messageProcessed(ConsumerImpl.java:1423)
                at 
org.apache.pulsar.client.impl.ConsumerImpl.internalReceive(ConsumerImpl.java:431)
                at 
org.apache.pulsar.client.impl.ConsumerBase.receive(ConsumerBase.java:175)
                   ...
   ```
   * It should not be required to explicitly indicate a null value to producer 
(only-key values should work just just fine). Exception thrown when working 
with implicit null value messages is EOFException in this case (the same before 
this a solution was provided).
   
   **To Reproduce**
   I have created a test project to reproduce these issues (`null` values 
implicitly and explictly set with both schema and schemaless consumer): 
[pulsar-tombstone-test](https://github.com/fmiguelez/pulsar-tombstone-test)
   
   Read README.md to reproduce it.
   
   **Expected behavior**
   Tombstones (`null` values in mesages with our without schema but with key) 
should be supported whether you indicate an schema or not and whether you 
explicitly indicate a `null` value or not (implicit `null` value). All tests 
should pass in the example project.
   
   ```
   Producer<byte []> schemalessProducer = 
client.newProducer(Schema.BYTES).topic(TOPIC).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
   
   Consumer<byte []> schemalessConsumer = 
client.newConsumer(Schema.BYTES).topic(TOPIC).subscriptionName("test")
                                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
   
   // Implicit tombstone  without schema
   schemalessProducer.key("1").send();
   
   // Explicit tombstone (the one supposedly to work)
   schemalessProducer.key("2").value(null).send();
   
   Message<byte[]> implicitTombstone = schemalessReceiver.receive(2, 
TimeUnit.SECONDS).getValue());
   Message<byte[]> explictitTombstone = schemalessReceiver.receive(2, 
TimeUnit.SECONDS).getValue())
   
   System.out.println(String.format("Implicit tombstone: {key=%s, value=%s}", 
implicitTombstone.getKey(), implicitTombstone.getValue()));
   System.out.println(String.format("Explicit tombstone: {key=%s, value=%s}", 
implicitTombstone.getKey(), implicitTombstone.getValue()));
   ```
   
   ```
   Producer<DummyObject> schemaProducer = 
client.newProducer(Schema.AVRO(DummyObject.class)).topic(TOPIC).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
   
   Consumer<DummyObject> schemaConsumer = 
client.newConsumer(Schema.AVRO(DummyObject.class)).topic(TOPIC).subscriptionName("test")
                                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
   
   // Implicit tombstone  without schema
   schemaProducer.key("1").send();
   
   // Explicit tombstone (the one supposedly to work)
   schemaProducer.key("2").value(null).send();
   
   Message<DummyObject> implicitTombstone = schemalessReceiver.receive(2, 
TimeUnit.SECONDS).getValue());
   Message<DummyObject> explictitTombstone = schemalessReceiver.receive(2, 
TimeUnit.SECONDS).getValue())
   
   System.out.println(String.format("Implicit tombstone: {key=%s, value=%s}", 
implicitTombstone.getKey(), implicitTombstone.getValue()));
   System.out.println(String.format("Explicit tombstone: {key=%s, value=%s}", 
implicitTombstone.getKey(), implicitTombstone.getValue()));
   ```
   
   
   **Screenshots**
   
   **Desktop (please complete the following information):**
    - Windows 10 with Docker Deskto to run Pulsar containers
   
   **Additional context**
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to