This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 95d24ac4550 [feat][client] Introduce Refresh API in the TableView 
(#21417)
95d24ac4550 is described below

commit 95d24ac4550253c906913d77ccb7dfd3a4cd31d3
Author: Xiangying Meng <[email protected]>
AuthorDate: Fri Mar 15 16:57:43 2024 +0800

    [feat][client] Introduce Refresh API in the TableView (#21417)
    
    Master https://github.com/apache/pulsar/pull/21271
    ### Motivation
    The proposal will introduce a new API to refresh the table view with the 
latest written data on the topic, ensuring that all subsequent reads are based 
on the refreshed data.
---
 .../apache/pulsar/client/impl/TableViewTest.java   | 134 +++++++++++++++++++-
 .../org/apache/pulsar/client/api/TableView.java    |  34 +++++
 .../apache/pulsar/client/impl/TableViewImpl.java   | 141 +++++++++++++++++----
 3 files changed, 282 insertions(+), 27 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
index 523360884c1..61ab4de8a32 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
@@ -38,10 +38,12 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.api.Message;
@@ -49,6 +51,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TableView;
@@ -101,10 +104,11 @@ public class TableViewTest extends 
MockedPulsarServiceBaseTest {
     }
 
     private Set<String> publishMessages(String topic, int count, boolean 
enableBatch) throws Exception {
-        return publishMessages(topic, count, enableBatch, false);
+        return publishMessages(topic, 0, count, enableBatch, false);
     }
 
-    private Set<String> publishMessages(String topic, int count, boolean 
enableBatch, boolean enableEncryption) throws Exception {
+    private Set<String> publishMessages(String topic, int keyStartPosition, 
int count, boolean enableBatch,
+                                        boolean enableEncryption) throws 
Exception {
         Set<String> keys = new HashSet<>();
         ProducerBuilder<byte[]> builder = pulsarClient.newProducer();
         builder.messageRoutingMode(MessageRoutingMode.SinglePartition);
@@ -124,7 +128,7 @@ public class TableViewTest extends 
MockedPulsarServiceBaseTest {
         }
         try (Producer<byte[]> producer = builder.create()) {
             CompletableFuture<?> lastFuture = null;
-            for (int i = 0; i < count; i++) {
+            for (int i = keyStartPosition; i < keyStartPosition + count; i++) {
                 String key = "key"+ i;
                 byte[] data = ("my-message-" + i).getBytes();
                 lastFuture = 
producer.newMessage().key(key).value(data).sendAsync();
@@ -136,6 +140,126 @@ public class TableViewTest extends 
MockedPulsarServiceBaseTest {
         return keys;
     }
 
+    @DataProvider(name = "partition")
+    public static Object[][] partition () {
+        return new Object[][] {
+                { 3 }, { 0 }
+        };
+    }
+
+    /**
+     * Case1:
+     * 1. Slow down the rate of reading messages.
+     * 2. Send some messages
+     * 3. Call new `refresh` API, it will wait for reading all the messages 
completed.
+     * Case2:
+     * 1. No new messages.
+     * 2. Call new `refresh` API, it will be completed immediately.
+     * Case3:
+     * 1. multi-partition topic, p1, p2 has new message, p3 has no new 
messages.
+     * 2. Call new `refresh` API, it will be completed after read new messages.
+     */
+    @Test(dataProvider = "partition")
+    public void testRefreshAPI(int partition) throws Exception {
+        // 1. Prepare resource.
+        String topic = "persistent://public/default/testRefreshAPI" + 
RandomUtils.nextLong();
+        if (partition == 0) {
+            admin.topics().createNonPartitionedTopic(topic);
+        } else {
+            admin.topics().createPartitionedTopic(topic, partition);
+        }
+
+        @Cleanup
+        TableView<byte[]> tv = pulsarClient.newTableView(Schema.BYTES)
+                .topic(topic)
+                .create();
+        // 2. Add a listen action to provide the test environment.
+        // The listen action will be triggered when there are incoming 
messages every time.
+        // This is a sync operation, so sleep in the listen action can slow 
down the reading rate of messages.
+        tv.listen((k, v) -> {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        // 3. Send 20 messages. After refresh, all the messages should be 
received.
+        int count = 20;
+        Set<String> keys = this.publishMessages(topic, count, false);
+        // After message sending completely, the table view will take at least 
2 seconds to receive all the messages.
+        // If there is not the refresh operation, all messages will not be 
received.
+        tv.refresh();
+        // The key of each message is different.
+        assertEquals(tv.size(), count);
+        assertEquals(tv.keySet(), keys);
+        // 4. Test refresh operation can be completed when there is a 
partition with on new messages
+        // or no new message for no partition topic.
+        if (partition > 0) {
+            publishMessages(topic, partition - 1, count, false, false);
+            tv.refreshAsync().get(5, TimeUnit.SECONDS);
+            assertEquals(tv.size(), count + partition - 1);
+        } else {
+            tv.refreshAsync().get(5, TimeUnit.SECONDS);
+        }
+    }
+
+    /**
+     * Case1:
+     * 1. Slow down the read of reading messages.
+     * 2. Send some messages.
+     * 3. Call new `refresh` API.
+     * 4. Close the reader of the tableview.
+     * 5. The refresh operation will be failed with a `AlreadyClosedException`.
+     * Case2:
+     * 1. Close the reader of the tableview.
+     * 2. Call new `refresh` API.
+     * 3. The refresh operation will be fail with a `AlreadyClosedException`.
+     */
+    @Test
+    public void testRefreshTaskCanBeCompletedWhenReaderClosed() throws 
Exception {
+        // 1. Prepare resource.
+        String topic1 = 
"persistent://public/default/testRefreshTaskCanBeCompletedWhenReaderClosed-1";
+        admin.topics().createNonPartitionedTopic(topic1);
+        String topic2 = 
"persistent://public/default/testRefreshTaskCanBeCompletedWhenReaderClosed-2";
+        admin.topics().createNonPartitionedTopic(topic2);
+        @Cleanup
+        TableView<byte[]> tv1 = pulsarClient.newTableView(Schema.BYTES)
+                .topic(topic1)
+                .create();
+        @Cleanup
+        TableView<byte[]> tv2 = pulsarClient.newTableView(Schema.BYTES)
+                .topic(topic1)
+                .create();
+        // 2. Slow down the rate of reading messages.
+        tv1.listen((k, v) -> {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        publishMessages(topic1, 20, false);
+        AtomicBoolean completedExceptionally = new AtomicBoolean(false);
+        // 3. Test failing `refresh` in the reading process.
+        tv1.refreshAsync().exceptionally(ex -> {
+            if (ex.getCause() instanceof 
PulsarClientException.AlreadyClosedException) {
+                completedExceptionally.set(true);
+            }
+            return null;
+        });
+        tv1.close();
+
+        // 4. Test failing `refresh` when get last message IDs. The topic2 has 
no available messages.
+        tv2.close();
+        try {
+            tv2.refresh();
+            fail();
+        } catch (Exception e) {
+            assertTrue(e instanceof 
PulsarClientException.AlreadyClosedException);
+        }
+        Awaitility.await().untilAsserted(() -> 
assertTrue(completedExceptionally.get()));
+    }
+
     @Test(timeOut = 30 * 1000)
     public void testTableView() throws Exception {
         String topic = "persistent://public/default/tableview-test";
@@ -391,7 +515,7 @@ public class TableViewTest extends 
MockedPulsarServiceBaseTest {
 
         // publish encrypted messages
         int count = 20;
-        Set<String> keys = this.publishMessages(topic, count, false, true);
+        Set<String> keys = this.publishMessages(topic, 0, count, false, true);
 
         // TableView can read them using the private key
         @Cleanup
@@ -437,7 +561,7 @@ public class TableViewTest extends 
MockedPulsarServiceBaseTest {
         FieldUtils.writeDeclaredField(reader, "consumer", consumer, true);
 
         int msgCnt = 2;
-        this.publishMessages(topic, msgCnt, false, false);
+        this.publishMessages(topic, 0, msgCnt, false, false);
         Awaitility.await()
                 .atMost(5, TimeUnit.SECONDS)
                 .untilAsserted(() -> {
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java
index 9e5008c8bd0..767b8e1103f 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java
@@ -110,4 +110,38 @@ public interface TableView<T> extends Closeable {
      * @return a future that can used to track when the table view has been 
closed.
      */
     CompletableFuture<Void> closeAsync();
+
+    /**
+     * Refresh the table view with the latest data in the topic, ensuring that 
all subsequent reads are based on
+     * the refreshed data.
+     *
+     * Example usage:
+     *
+     * table.refreshAsync().thenApply(__ -> table.get(key));
+     *
+     * This function retrieves the last written message in the topic and 
refreshes the table view accordingly.
+     * Once the refresh is complete, all subsequent reads will be performed on 
the refreshed data or a combination of
+     * the refreshed data and newly published data. The table view remains 
synchronized with any newly published data
+     * after the refresh.
+     *
+     * |x:0|->|y:0|->|z:0|->|x:1|->|z:1|->|x:2|->|y:1|->|y:2|
+     *
+     * If a read occurs after the refresh (at the last published message 
|y:2|), it ensures that outdated data like x=1
+     * is not obtained. However, it does not guarantee that the values will 
always be x=2, y=2, z=1,
+     * as the table view may receive updates with newly published data.
+     *
+     * |x:0|->|y:0|->|z:0|->|x:1|->|z:1|->|x:2|->|y:1|->|y:2| -> |y:3|
+     *
+     * Both y=2 or y=3 are possible. Therefore, different readers may receive 
different values,
+     * but all values will be equal to or newer than the data refreshed from 
the last call to the refresh method.
+     */
+    CompletableFuture<Void> refreshAsync();
+
+    /**
+     * Refresh the table view with the latest data in the topic, ensuring that 
all subsequent reads are based on
+     * the refreshed data.
+     *
+     * @throws PulsarClientException if there is any error refreshing the 
table view.
+     */
+    void refresh() throws PulsarClientException;
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index 64abd6d811b..d5d4174ee10 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -60,6 +60,26 @@ public class TableViewImpl<T> implements TableView<T> {
     private final boolean isPersistentTopic;
     private TopicCompactionStrategy<T> compactionStrategy;
 
+    /**
+     * Store the refresh tasks. When read to the position recording in the 
right map,
+     * then remove the position in the right map. If the right map is empty, 
complete the future in the left.
+     * There should be no timeout exception here, because the caller can only 
retry for TimeoutException.
+     * It will only be completed exceptionally when no more messages can be 
read.
+     */
+    private final ConcurrentHashMap<CompletableFuture<Void>, Map<String, 
TopicMessageId>> pendingRefreshRequests;
+
+    /**
+     * This map stored the read position of each partition. It is used for the 
following case:
+     * <p>
+     *      1. Get last message ID.
+     *      2. Receive message p1-1:1, p2-1:1, p2-1:2, p3-1:1
+     *      3. Receive response of step1 {|p1-1:1|p2-2:2|p3-3:6|}
+     *      4. No more messages are written to this topic.
+     *      As a result, the refresh operation will never be completed.
+     * </p>
+     */
+    private final ConcurrentHashMap<String, MessageId> lastReadPositions;
+
     TableViewImpl(PulsarClientImpl client, Schema<T> schema, 
TableViewConfigurationData conf) {
         this.conf = conf;
         this.isPersistentTopic = 
conf.getTopicName().startsWith(TopicDomain.persistent.toString());
@@ -69,6 +89,8 @@ public class TableViewImpl<T> implements TableView<T> {
         this.listenersMutex = new ReentrantLock();
         this.compactionStrategy =
                 TopicCompactionStrategy.load(TABLE_VIEW_TAG, 
conf.getTopicCompactionStrategyClassName());
+        this.pendingRefreshRequests = new ConcurrentHashMap<>();
+        this.lastReadPositions = new ConcurrentHashMap<>();
         ReaderBuilder<T> readerBuilder = client.newReader(schema)
                 .topic(conf.getTopicName())
                 .startMessageId(MessageId.earliest)
@@ -94,9 +116,10 @@ public class TableViewImpl<T> implements TableView<T> {
         return reader.thenCompose((reader) -> {
             if (!isPersistentTopic) {
                 readTailMessages(reader);
-                return CompletableFuture.completedFuture(reader);
+                return CompletableFuture.completedFuture(null);
             }
-            return this.readAllExistingMessages(reader);
+            return this.readAllExistingMessages(reader)
+                    .thenRun(() -> readTailMessages(reader));
         }).thenApply(__ -> this);
     }
 
@@ -180,6 +203,7 @@ public class TableViewImpl<T> implements TableView<T> {
     }
 
     private void handleMessage(Message<T> msg) {
+        lastReadPositions.put(msg.getTopicName(), msg.getMessageId());
         try {
             if (msg.hasKey()) {
                 String key = msg.getKey();
@@ -226,31 +250,104 @@ public class TableViewImpl<T> implements TableView<T> {
                     }
                 }
             }
+            checkAllFreshTask(msg);
         } finally {
             msg.release();
         }
     }
 
-    private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> 
reader) {
+    @Override
+    public CompletableFuture<Void> refreshAsync() {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        reader.thenCompose(reader -> 
getLastMessageIds(reader).thenAccept(lastMessageIds -> {
+            // After get the response of lastMessageIds, put the future and 
result into `refreshMap`
+            // and then filter out partitions that has been read to the 
lastMessageID.
+            pendingRefreshRequests.put(completableFuture, lastMessageIds);
+            filterReceivedMessages(lastMessageIds);
+            // If there is no new messages, the refresh operation could be 
completed right now.
+            if (lastMessageIds.isEmpty()) {
+                pendingRefreshRequests.remove(completableFuture);
+                completableFuture.complete(null);
+            }
+        })).exceptionally(throwable -> {
+            completableFuture.completeExceptionally(throwable);
+            pendingRefreshRequests.remove(completableFuture);
+            return null;
+        });
+        return completableFuture;
+    }
+
+    @Override
+    public void refresh() throws PulsarClientException {
+        try {
+            refreshAsync().get();
+        } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
+        }
+    }
+
+    private CompletableFuture<Void> readAllExistingMessages(Reader<T> reader) {
         long startTime = System.nanoTime();
         AtomicLong messagesRead = new AtomicLong();
 
-        CompletableFuture<Reader<T>> future = new CompletableFuture<>();
-        reader.getLastMessageIdsAsync().thenAccept(lastMessageIds -> {
-            Map<String, TopicMessageId> maxMessageIds = new 
ConcurrentHashMap<>();
-            lastMessageIds.forEach(topicMessageId -> {
-                maxMessageIds.put(topicMessageId.getOwnerTopic(), 
topicMessageId);
-            });
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        getLastMessageIds(reader).thenAccept(maxMessageIds -> {
             readAllExistingMessages(reader, future, startTime, messagesRead, 
maxMessageIds);
         }).exceptionally(ex -> {
             future.completeExceptionally(ex);
             return null;
         });
-        future.thenAccept(__ -> readTailMessages(reader));
         return future;
     }
 
-    private void readAllExistingMessages(Reader<T> reader, 
CompletableFuture<Reader<T>> future, long startTime,
+    private CompletableFuture<Map<String, TopicMessageId>> 
getLastMessageIds(Reader<T> reader) {
+        return reader.getLastMessageIdsAsync().thenApply(lastMessageIds -> {
+            Map<String, TopicMessageId> maxMessageIds = new 
ConcurrentHashMap<>();
+            lastMessageIds.forEach(topicMessageId -> {
+                maxMessageIds.put(topicMessageId.getOwnerTopic(), 
topicMessageId);
+            });
+            return maxMessageIds;
+        });
+    }
+
+    private void filterReceivedMessages(Map<String, TopicMessageId> 
lastMessageIds) {
+        // The `lastMessageIds` and `readPositions` is concurrency-safe data 
types.
+        lastMessageIds.forEach((partition, lastMessageId) -> {
+            MessageId messageId = lastReadPositions.get(partition);
+            if (messageId != null && lastMessageId.compareTo(messageId) <= 0) {
+                lastMessageIds.remove(partition);
+            }
+        });
+    }
+
+    private boolean checkFreshTask(Map<String, TopicMessageId> maxMessageIds, 
CompletableFuture<Void> future,
+                                   MessageId messageId, String topicName) {
+        // The message received from multi-consumer/multi-reader is processed 
to TopicMessageImpl.
+        TopicMessageId maxMessageId = maxMessageIds.get(topicName);
+        // We need remove the partition from the maxMessageIds map
+        // once the partition has been read completely.
+        if (maxMessageId != null && messageId.compareTo(maxMessageId) >= 0) {
+            maxMessageIds.remove(topicName);
+        }
+        if (maxMessageIds.isEmpty()) {
+            future.complete(null);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private void checkAllFreshTask(Message<T> msg) {
+        pendingRefreshRequests.forEach((future, maxMessageIds) -> {
+            String topicName = msg.getTopicName();
+            MessageId messageId = msg.getMessageId();
+            if (checkFreshTask(maxMessageIds, future, messageId, topicName)) {
+                pendingRefreshRequests.remove(future);
+            }
+        });
+    }
+
+    private void readAllExistingMessages(Reader<T> reader, 
CompletableFuture<Void> future, long startTime,
                                          AtomicLong messagesRead, Map<String, 
TopicMessageId> maxMessageIds) {
         reader.hasMessageAvailableAsync()
                 .thenAccept(hasMessage -> {
@@ -258,17 +355,12 @@ public class TableViewImpl<T> implements TableView<T> {
                        reader.readNextAsync()
                                .thenAccept(msg -> {
                                   messagesRead.incrementAndGet();
-                                  // We need remove the partition from the 
maxMessageIds map
-                                  // once the partition has been read 
completely.
-                                  TopicMessageId maxMessageId = 
maxMessageIds.get(msg.getTopicName());
-                                  if (maxMessageId != null && 
msg.getMessageId().compareTo(maxMessageId) >= 0) {
-                                      maxMessageIds.remove(msg.getTopicName());
-                                  }
+                                  String topicName = msg.getTopicName();
+                                  MessageId messageId = msg.getMessageId();
                                   handleMessage(msg);
-                                  if (maxMessageIds.isEmpty()) {
-                                      future.complete(reader);
-                                  } else {
-                                      readAllExistingMessages(reader, future, 
startTime, messagesRead, maxMessageIds);
+                                  if (!checkFreshTask(maxMessageIds, future, 
messageId, topicName)) {
+                                      readAllExistingMessages(reader, future, 
startTime,
+                                              messagesRead, maxMessageIds);
                                   }
                                }).exceptionally(ex -> {
                                    if (ex.getCause() instanceof 
PulsarClientException.AlreadyClosedException) {
@@ -289,7 +381,7 @@ public class TableViewImpl<T> implements TableView<T> {
                                reader.getTopic(),
                                messagesRead,
                                durationMillis / 1000.0);
-                       future.complete(reader);
+                       future.complete(null);
                    }
                 });
     }
@@ -303,6 +395,11 @@ public class TableViewImpl<T> implements TableView<T> {
                     if (ex.getCause() instanceof 
PulsarClientException.AlreadyClosedException) {
                         log.error("Reader {} was closed while reading tail 
messages.",
                                 reader.getTopic(), ex);
+                        // Fail all refresh request when no more messages can 
be read.
+                        pendingRefreshRequests.keySet().forEach(future -> {
+                            pendingRefreshRequests.remove(future);
+                            future.completeExceptionally(ex);
+                        });
                     } else {
                         // Retrying on the other exceptions such as 
NotConnectedException
                         try {

Reply via email to