vroyer opened a new issue, #212: URL: https://github.com/apache/pulsar-client-reactive/issues/212
As the pipeline start() is actually asynchronous (see Mono.subscribe() javadoc), we get inconsistent results when sending a message right after starting a pipeline with an initial subscription position set to **Latest**. The issue is reproductible through the test available [here](https://github.com/exostrive/reactor-pulsar-issue). It's not always failing, but sometimes, the message is probably sent before the subscription is actually started, so we never get the expected message with the Latest initial position. `reactor-pulsar-issue% ./gradlew test --rerun-tasks > Task :test IssueTest STANDARD_ERROR SLF4J(W): No SLF4J providers were found. SLF4J(W): Defaulting to no-operation (NOP) logger implementation SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further details. IssueTest > testStartStopStartPipeline(int) > [1] 1 STANDARD_OUT Message date=Sun May 18 22:18:27 CEST 2025 value=1 consumers = [ConsumerStatsImpl(appId=null, msgRateOut=0.0, msgThroughputOut=0.0, bytesOutCounter=52, msgOutCounter=1, msgRateRedeliver=0.0, messageAckRate=0.0, chunkedMessageRate=0.0, consumerName=2ZaVu, availablePermits=999, unackedMessages=0, avgMessagesPerEntry=1, blockedConsumerOnUnackedMsgs=false, readPositionWhenJoining=null, drainingHashesCount=0, drainingHashesClearedTotal=0, drainingHashesUnackedMessages=0, drainingHashes=null, address=/172.17.0.1:49996, connectedSince=2025-05-18T20:18:27.222776635Z, clientVersion=Pulsar-Java-v4.0.4, lastAckedTimestamp=1747599507323, lastConsumedTimestamp=1747599507247, lastConsumedFlowTimestamp=1747599507238, keyHashRangeArrays=null, keyHashRanges=null, metadata={})] IssueTest > testStartStopStartPipeline(int) > [2] 2 STANDARD_OUT consumers = [ConsumerStatsImpl(appId=null, msgRateOut=0.0, msgThroughputOut=0.0, bytesOutCounter=0, msgOutCounter=0, msgRateRedeliver=0.0, messageAckRate=0.0, chunkedMessageRate=0.0, consumerName=sLIoI, availablePermits=1000, unackedMessages=0, avgMessagesPerEntry=0, blockedConsumerOnUnackedMsgs=false, readPositionWhenJoining=null, drainingHashesCount=0, drainingHashesClearedTotal=0, drainingHashesUnackedMessages=0, drainingHashes=null, address=/172.17.0.1:32996, connectedSince=2025-05-18T20:18:39.520636669Z, clientVersion=Pulsar-Java-v4.0.4, lastAckedTimestamp=0, lastConsumedTimestamp=0, lastConsumedFlowTimestamp=1747599519524, keyHashRangeArrays=null, keyHashRanges=null, metadata={}), ConsumerStatsImpl(appId=null, msgRateOut=0.0, msgThroughputOut=0.0, bytesOutCounter=52, msgOutCounter=1, msgRateRedeliver=0.0, messageAckRate=0.0, chunkedMessageRate=0.0, consumerName=WLb84, availablePermits=999, unackedMessages=1, avgMessagesPerEntry=1, blockedConsumerOnUnackedMsgs=false, read PositionWhenJoining=null, drainingHashesCount=0, drainingHashesClearedTotal=0, drainingHashesUnackedMessages=0, drainingHashes=null, address=/172.17.0.1:32996, connectedSince=2025-05-18T20:18:39.52273146Z, clientVersion=Pulsar-Java-v4.0.4, lastAckedTimestamp=0, lastConsumedTimestamp=1747599519539, lastConsumedFlowTimestamp=1747599519525, keyHashRangeArrays=null, keyHashRanges=null, metadata={})] IssueTest > testStartStopStartPipeline(int) > [2] 2 FAILED org.opentest4j.AssertionFailedError at IssueTest.java:92 2 tests completed, 1 failed` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org