vroyer opened a new issue, #212:
URL: https://github.com/apache/pulsar-client-reactive/issues/212

   As the pipeline start() is actually asynchronous (see Mono.subscribe() 
javadoc), we get inconsistent results when sending a message right after 
starting a pipeline with an initial subscription position set to **Latest**. 
The issue is reproductible through the test available 
[here](https://github.com/exostrive/reactor-pulsar-issue). It's not always 
failing, but sometimes, the message is probably sent before the subscription is 
actually started, so we never get the expected message with the Latest initial 
position.
   
   `reactor-pulsar-issue% ./gradlew test --rerun-tasks
   
   > Task :test
   
   IssueTest STANDARD_ERROR
       SLF4J(W): No SLF4J providers were found.
       SLF4J(W): Defaulting to no-operation (NOP) logger implementation
       SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further 
details.
   
   IssueTest > testStartStopStartPipeline(int) > [1] 1 STANDARD_OUT
       Message date=Sun May 18 22:18:27 CEST 2025 value=1
       consumers = [ConsumerStatsImpl(appId=null, msgRateOut=0.0, 
msgThroughputOut=0.0, bytesOutCounter=52, msgOutCounter=1, 
msgRateRedeliver=0.0, messageAckRate=0.0, chunkedMessageRate=0.0, 
consumerName=2ZaVu, availablePermits=999, unackedMessages=0, 
avgMessagesPerEntry=1, blockedConsumerOnUnackedMsgs=false, 
readPositionWhenJoining=null, drainingHashesCount=0, 
drainingHashesClearedTotal=0, drainingHashesUnackedMessages=0, 
drainingHashes=null, address=/172.17.0.1:49996, 
connectedSince=2025-05-18T20:18:27.222776635Z, 
clientVersion=Pulsar-Java-v4.0.4, lastAckedTimestamp=1747599507323, 
lastConsumedTimestamp=1747599507247, lastConsumedFlowTimestamp=1747599507238, 
keyHashRangeArrays=null, keyHashRanges=null, metadata={})]
   
   IssueTest > testStartStopStartPipeline(int) > [2] 2 STANDARD_OUT
       consumers = [ConsumerStatsImpl(appId=null, msgRateOut=0.0, 
msgThroughputOut=0.0, bytesOutCounter=0, msgOutCounter=0, msgRateRedeliver=0.0, 
messageAckRate=0.0, chunkedMessageRate=0.0, consumerName=sLIoI, 
availablePermits=1000, unackedMessages=0, avgMessagesPerEntry=0, 
blockedConsumerOnUnackedMsgs=false, readPositionWhenJoining=null, 
drainingHashesCount=0, drainingHashesClearedTotal=0, 
drainingHashesUnackedMessages=0, drainingHashes=null, 
address=/172.17.0.1:32996, connectedSince=2025-05-18T20:18:39.520636669Z, 
clientVersion=Pulsar-Java-v4.0.4, lastAckedTimestamp=0, 
lastConsumedTimestamp=0, lastConsumedFlowTimestamp=1747599519524, 
keyHashRangeArrays=null, keyHashRanges=null, metadata={}), 
ConsumerStatsImpl(appId=null, msgRateOut=0.0, msgThroughputOut=0.0, 
bytesOutCounter=52, msgOutCounter=1, msgRateRedeliver=0.0, messageAckRate=0.0, 
chunkedMessageRate=0.0, consumerName=WLb84, availablePermits=999, 
unackedMessages=1, avgMessagesPerEntry=1, blockedConsumerOnUnackedMsgs=false, 
read
 PositionWhenJoining=null, drainingHashesCount=0, drainingHashesClearedTotal=0, 
drainingHashesUnackedMessages=0, drainingHashes=null, 
address=/172.17.0.1:32996, connectedSince=2025-05-18T20:18:39.52273146Z, 
clientVersion=Pulsar-Java-v4.0.4, lastAckedTimestamp=0, 
lastConsumedTimestamp=1747599519539, lastConsumedFlowTimestamp=1747599519525, 
keyHashRangeArrays=null, keyHashRanges=null, metadata={})]
   
   IssueTest > testStartStopStartPipeline(int) > [2] 2 FAILED
       org.opentest4j.AssertionFailedError at IssueTest.java:92
   
   2 tests completed, 1 failed`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to