Demogorgon314 commented on code in PR #21170:
URL: https://github.com/apache/pulsar/pull/21170#discussion_r1338046372
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java:
##########
@@ -438,4 +444,55 @@ public void testTableViewTailMessageReadRetry() throws
Exception {
});
verify(consumer, times(msgCnt)).receiveAsync();
}
+
+ @Test
+ public void testBuildTableViewWithMessagesAlwaysAvailable() throws
Exception {
+ String topic =
"persistent://public/default/testBuildTableViewWithMessagesAlwaysAvailable";
+ admin.topics().createPartitionedTopic(topic, 10);
+ @Cleanup
+ Reader<byte[]> reader = pulsarClient.newReader()
+ .topic(topic)
+ .startMessageId(MessageId.earliest)
+ .create();
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .create();
+ // Prepare real data to do test.
+ for (int i = 0; i < 1000; i++) {
+ producer.newMessage().send();
+ }
+ List<TopicMessageId> lastMessageIds = reader.getLastMessageIds();
+
+ // Use mock reader to build tableview. In the old implementation, the
readAllExistingMessages method
+ // will not be completed because the
`mockReader.hasMessageAvailable()` always return ture.
+ Reader<byte[]> mockReader = spy(reader);
+ when(mockReader.hasMessageAvailable()).thenReturn(true);
+
when(mockReader.getLastMessageIdsAsync()).thenReturn(CompletableFuture.completedFuture(lastMessageIds));
+ AtomicInteger index = new AtomicInteger(lastMessageIds.size());
+ when(mockReader.readNextAsync()).thenAnswer(invocation -> {
+ Message<byte[]> message = spy(Message.class);
+ int localIndex = index.decrementAndGet();
+
when(message.getTopicName()).thenReturn(lastMessageIds.get(localIndex).getOwnerTopic());
+
when(message.getMessageId()).thenReturn(lastMessageIds.get(localIndex));
+ when(message.hasKey()).thenReturn(false);
+ doNothing().when(message).release();
+ return CompletableFuture.completedFuture(message);
+ });
+ @Cleanup
+ TableViewImpl<byte[]> tableView = (TableViewImpl<byte[]>)
pulsarClient.newTableView()
+ .topic(topic)
+ .createAsync()
+ .get();
+ TableViewImpl<byte[]> mockTableView = spy(tableView);
+ Method readAllExistingMessagesMethod = TableViewImpl.class
+ .getDeclaredMethod("readAllExistingMessages", Reader.class);
+ readAllExistingMessagesMethod.setAccessible(true);
+ CompletableFuture<Reader<?>> future =
Review Comment:
Should we send more messages before invoking the `readAllExistingMessages`
method to ensure we did not receive the new messages?
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java:
##########
@@ -235,20 +237,36 @@ private CompletableFuture<Reader<T>>
readAllExistingMessages(Reader<T> reader) {
AtomicLong messagesRead = new AtomicLong();
CompletableFuture<Reader<T>> future = new CompletableFuture<>();
- readAllExistingMessages(reader, future, startTime, messagesRead);
+ reader.getLastMessageIdsAsync().thenAccept(lastMessageIds -> {
+ Map<String, TopicMessageId> maxMessageIds = new HashMap<>();
+ lastMessageIds.forEach(topicMessageId -> {
+ maxMessageIds.put(topicMessageId.getOwnerTopic(),
topicMessageId);
+ });
+ readAllExistingMessages(reader, future, startTime, messagesRead,
maxMessageIds);
+ });
return future;
}
private void readAllExistingMessages(Reader<T> reader,
CompletableFuture<Reader<T>> future, long startTime,
- AtomicLong messagesRead) {
+ AtomicLong messagesRead, Map<String,
TopicMessageId> maxMessageIds) {
reader.hasMessageAvailableAsync()
.thenAccept(hasMessage -> {
if (hasMessage) {
reader.readNextAsync()
.thenAccept(msg -> {
messagesRead.incrementAndGet();
handleMessage(msg);
- readAllExistingMessages(reader, future,
startTime, messagesRead);
+ // The message is read one by one in a
single thread,
+ // so it's fine that uses a hashmap to store
last message ID.
+ TopicMessageId maxMessageId =
maxMessageIds.get(msg.getTopicName());
+ if (maxMessageId != null &&
msg.getMessageId().compareTo(maxMessageId) >= 0) {
+ maxMessageIds.remove(msg.getTopicName());
+ }
+ if (maxMessageIds.size() == 0) {
Review Comment:
```suggestion
if (maxMessageIds.isEmpty()) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]