thetumbled commented on PR #21917:
URL: https://github.com/apache/pulsar/pull/21917#issuecomment-1899676966

   > @thetumbled @codelipenghui The following test can reproduce the problem, 
and this PR can fix it.
   > 
   > ```java
   >     @Test
   >     public void testAllCompactedOut() throws Exception {
   >         String topicName = 
"persistent://my-property/use/my-ns/testAllCompactedOut";
   > 
   >         @Cleanup
   >         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
   >                 
.enableBatching(true).topic(topicName).batchingMaxMessages(3).create();
   > 
   >         producer.newMessage().key("K1").value("V1").sendAsync();
   >         producer.newMessage().key("K2").value("V2").sendAsync();
   >         producer.newMessage().key("K2").value(null).sendAsync();
   >         producer.flush();
   > 
   >         admin.topics().triggerCompaction(topicName);
   > 
   >         Awaitility.await().untilAsserted(() -> {
   >             assertEquals(admin.topics().compactionStatus(topicName).status,
   >                     LongRunningProcessStatus.Status.SUCCESS);
   >         });
   > 
   >         producer.newMessage().key("K1").value(null).sendAsync();
   >         producer.flush();
   > 
   >         admin.topics().triggerCompaction(topicName);
   > 
   >         Awaitility.await().untilAsserted(() -> {
   >             assertEquals(admin.topics().compactionStatus(topicName).status,
   >                     LongRunningProcessStatus.Status.SUCCESS);
   >         });
   > 
   >         @Cleanup
   >         Reader<String> reader = pulsarClient.newReader(Schema.STRING)
   >                 .subscriptionName("reader-test")
   >                 .topic(topicName)
   >                 .readCompacted(true)
   >                 .startMessageId(MessageId.earliest)
   >                 .create();
   >         Assert.assertEquals(reader.getLastMessageIds().get(0), 
MessageIdImpl.earliest);
   >         while (reader.hasMessageAvailable()) {
   >             Message<String> message = reader.readNext(3, TimeUnit.SECONDS);
   >             Assert.assertNotNull(message);
   >         }
   >     }
   > ```
   
   Great job, but there is something wrong with the test code. We need to set 
the config `topicCompactionRetainNullKey` to be true, so that the empty can be 
written into the compacted topic and the exception is throw out.
   I have pushed up the test code, PTAL, thanks! @coderzc 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to