YannPerthuis opened a new issue #11877:
URL: https://github.com/apache/pulsar/issues/11877
**Describe the bug**
It seems to me that when a message is posted to a topic via a transaction,
two entries are inserted into the topic (as seen with pulsar manager ui, there
is probably a system message and the actual posted message) as opposed to only
one entry when the message is posted without a transaction.
When I use a reader to read all the messages in the same topic, I use the
'hasMessageAvailable' method and then read the message if there is one. The
problem is, after reading the last message in the topic, the
'hasMessageAvailable' method still returns 'true' instead of 'false'.
Logically my reader tries to read the next message but gets stuck.
I think that the 'hasMessageAvailable' method returns 'true' because the
broker API sees that my reader's cursor is not at the end of the topic (there
is still a system message after the current position of my cursor) and does not
differentiate between real and system messages. However, when the reader tries
to read the last message it gets stuck because the broker API has no real
message to return. This is just a guess.
Ps: I tried to use the 'hasReachedEndOfTopic' method, but it is of no use to
me because it returns false all the time.
Translated with www.DeepL.com/Translator (free version)
**To Reproduce**
1. Run pulsar standalone (with transactionCoordinatorEnabled=true in conf)
2. Run the following piece of code :
```java
public class Application {
public static void main(String[] args) throws IOException {
String[] messagesToPublish = new String[]{ "message-1", "message-2",
"message-3", "message-4", "message-5" };
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.statsInterval(0, TimeUnit.SECONDS)
.enableTransaction(true)
.build();
String sourceTopic = "public/default/test-topic";
Producer<String> producer = pulsarClient
.newProducer(Schema.STRING)
.topic(sourceTopic)
.sendTimeout(0, TimeUnit.SECONDS)
.create();
Stream.of(messagesToPublish)
.forEach(m -> {
try {
Transaction txn = pulsarClient
.newTransaction()
.build()
.get();
producer.newMessage(txn).value(m).send();
txn.commit().get();
} catch (PulsarClientException | ExecutionException |
InterruptedException e) {
e.printStackTrace();
}
});
Reader<String> reader = pulsarClient
.newReader(Schema.STRING)
.topic(sourceTopic)
.subscriptionName("test")
.startMessageId(MessageId.earliest)
.create();
while (reader.hasMessageAvailable()) {
Message<String> message = reader.readNext(); // Stuck here
System.out.println("New message received: " +
message.getValue());
}
System.out.println("Should logically be written but is not");
}
}
```
3. Note that the program never exits and that the last print is logically
not done.
**Expected behavior**
The hasMessageAvailable method should return 'false' when the reader has
finished reading all the messages in a topic that were all originally published
via a separate transaction.
**Desktop**
- OS: macOs v11.2.3
- Java: v8
- Pulsar (broker and client): v2.8.0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]