This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 95d24ac4550 [feat][client] Introduce Refresh API in the TableView
(#21417)
95d24ac4550 is described below
commit 95d24ac4550253c906913d77ccb7dfd3a4cd31d3
Author: Xiangying Meng <[email protected]>
AuthorDate: Fri Mar 15 16:57:43 2024 +0800
[feat][client] Introduce Refresh API in the TableView (#21417)
Master https://github.com/apache/pulsar/pull/21271
### Motivation
The proposal will introduce a new API to refresh the table view with the
latest written data on the topic, ensuring that all subsequent reads are based
on the refreshed data.
---
.../apache/pulsar/client/impl/TableViewTest.java | 134 +++++++++++++++++++-
.../org/apache/pulsar/client/api/TableView.java | 34 +++++
.../apache/pulsar/client/impl/TableViewImpl.java | 141 +++++++++++++++++----
3 files changed, 282 insertions(+), 27 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
index 523360884c1..61ab4de8a32 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
@@ -38,10 +38,12 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Message;
@@ -49,6 +51,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
@@ -101,10 +104,11 @@ public class TableViewTest extends
MockedPulsarServiceBaseTest {
}
private Set<String> publishMessages(String topic, int count, boolean
enableBatch) throws Exception {
- return publishMessages(topic, count, enableBatch, false);
+ return publishMessages(topic, 0, count, enableBatch, false);
}
- private Set<String> publishMessages(String topic, int count, boolean
enableBatch, boolean enableEncryption) throws Exception {
+ private Set<String> publishMessages(String topic, int keyStartPosition,
int count, boolean enableBatch,
+ boolean enableEncryption) throws
Exception {
Set<String> keys = new HashSet<>();
ProducerBuilder<byte[]> builder = pulsarClient.newProducer();
builder.messageRoutingMode(MessageRoutingMode.SinglePartition);
@@ -124,7 +128,7 @@ public class TableViewTest extends
MockedPulsarServiceBaseTest {
}
try (Producer<byte[]> producer = builder.create()) {
CompletableFuture<?> lastFuture = null;
- for (int i = 0; i < count; i++) {
+ for (int i = keyStartPosition; i < keyStartPosition + count; i++) {
String key = "key"+ i;
byte[] data = ("my-message-" + i).getBytes();
lastFuture =
producer.newMessage().key(key).value(data).sendAsync();
@@ -136,6 +140,126 @@ public class TableViewTest extends
MockedPulsarServiceBaseTest {
return keys;
}
+ @DataProvider(name = "partition")
+ public static Object[][] partition () {
+ return new Object[][] {
+ { 3 }, { 0 }
+ };
+ }
+
+ /**
+ * Case1:
+ * 1. Slow down the rate of reading messages.
+ * 2. Send some messages
+ * 3. Call new `refresh` API, it will wait for reading all the messages
completed.
+ * Case2:
+ * 1. No new messages.
+ * 2. Call new `refresh` API, it will be completed immediately.
+ * Case3:
+ * 1. multi-partition topic, p1, p2 has new message, p3 has no new
messages.
+ * 2. Call new `refresh` API, it will be completed after read new messages.
+ */
+ @Test(dataProvider = "partition")
+ public void testRefreshAPI(int partition) throws Exception {
+ // 1. Prepare resource.
+ String topic = "persistent://public/default/testRefreshAPI" +
RandomUtils.nextLong();
+ if (partition == 0) {
+ admin.topics().createNonPartitionedTopic(topic);
+ } else {
+ admin.topics().createPartitionedTopic(topic, partition);
+ }
+
+ @Cleanup
+ TableView<byte[]> tv = pulsarClient.newTableView(Schema.BYTES)
+ .topic(topic)
+ .create();
+ // 2. Add a listen action to provide the test environment.
+ // The listen action will be triggered when there are incoming
messages every time.
+ // This is a sync operation, so sleep in the listen action can slow
down the reading rate of messages.
+ tv.listen((k, v) -> {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ // 3. Send 20 messages. After refresh, all the messages should be
received.
+ int count = 20;
+ Set<String> keys = this.publishMessages(topic, count, false);
+ // After message sending completely, the table view will take at least
2 seconds to receive all the messages.
+ // If there is not the refresh operation, all messages will not be
received.
+ tv.refresh();
+ // The key of each message is different.
+ assertEquals(tv.size(), count);
+ assertEquals(tv.keySet(), keys);
+ // 4. Test refresh operation can be completed when there is a
partition with on new messages
+ // or no new message for no partition topic.
+ if (partition > 0) {
+ publishMessages(topic, partition - 1, count, false, false);
+ tv.refreshAsync().get(5, TimeUnit.SECONDS);
+ assertEquals(tv.size(), count + partition - 1);
+ } else {
+ tv.refreshAsync().get(5, TimeUnit.SECONDS);
+ }
+ }
+
+ /**
+ * Case1:
+ * 1. Slow down the read of reading messages.
+ * 2. Send some messages.
+ * 3. Call new `refresh` API.
+ * 4. Close the reader of the tableview.
+ * 5. The refresh operation will be failed with a `AlreadyClosedException`.
+ * Case2:
+ * 1. Close the reader of the tableview.
+ * 2. Call new `refresh` API.
+ * 3. The refresh operation will be fail with a `AlreadyClosedException`.
+ */
+ @Test
+ public void testRefreshTaskCanBeCompletedWhenReaderClosed() throws
Exception {
+ // 1. Prepare resource.
+ String topic1 =
"persistent://public/default/testRefreshTaskCanBeCompletedWhenReaderClosed-1";
+ admin.topics().createNonPartitionedTopic(topic1);
+ String topic2 =
"persistent://public/default/testRefreshTaskCanBeCompletedWhenReaderClosed-2";
+ admin.topics().createNonPartitionedTopic(topic2);
+ @Cleanup
+ TableView<byte[]> tv1 = pulsarClient.newTableView(Schema.BYTES)
+ .topic(topic1)
+ .create();
+ @Cleanup
+ TableView<byte[]> tv2 = pulsarClient.newTableView(Schema.BYTES)
+ .topic(topic1)
+ .create();
+ // 2. Slow down the rate of reading messages.
+ tv1.listen((k, v) -> {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ publishMessages(topic1, 20, false);
+ AtomicBoolean completedExceptionally = new AtomicBoolean(false);
+ // 3. Test failing `refresh` in the reading process.
+ tv1.refreshAsync().exceptionally(ex -> {
+ if (ex.getCause() instanceof
PulsarClientException.AlreadyClosedException) {
+ completedExceptionally.set(true);
+ }
+ return null;
+ });
+ tv1.close();
+
+ // 4. Test failing `refresh` when get last message IDs. The topic2 has
no available messages.
+ tv2.close();
+ try {
+ tv2.refresh();
+ fail();
+ } catch (Exception e) {
+ assertTrue(e instanceof
PulsarClientException.AlreadyClosedException);
+ }
+ Awaitility.await().untilAsserted(() ->
assertTrue(completedExceptionally.get()));
+ }
+
@Test(timeOut = 30 * 1000)
public void testTableView() throws Exception {
String topic = "persistent://public/default/tableview-test";
@@ -391,7 +515,7 @@ public class TableViewTest extends
MockedPulsarServiceBaseTest {
// publish encrypted messages
int count = 20;
- Set<String> keys = this.publishMessages(topic, count, false, true);
+ Set<String> keys = this.publishMessages(topic, 0, count, false, true);
// TableView can read them using the private key
@Cleanup
@@ -437,7 +561,7 @@ public class TableViewTest extends
MockedPulsarServiceBaseTest {
FieldUtils.writeDeclaredField(reader, "consumer", consumer, true);
int msgCnt = 2;
- this.publishMessages(topic, msgCnt, false, false);
+ this.publishMessages(topic, 0, msgCnt, false, false);
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java
index 9e5008c8bd0..767b8e1103f 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java
@@ -110,4 +110,38 @@ public interface TableView<T> extends Closeable {
* @return a future that can used to track when the table view has been
closed.
*/
CompletableFuture<Void> closeAsync();
+
+ /**
+ * Refresh the table view with the latest data in the topic, ensuring that
all subsequent reads are based on
+ * the refreshed data.
+ *
+ * Example usage:
+ *
+ * table.refreshAsync().thenApply(__ -> table.get(key));
+ *
+ * This function retrieves the last written message in the topic and
refreshes the table view accordingly.
+ * Once the refresh is complete, all subsequent reads will be performed on
the refreshed data or a combination of
+ * the refreshed data and newly published data. The table view remains
synchronized with any newly published data
+ * after the refresh.
+ *
+ * |x:0|->|y:0|->|z:0|->|x:1|->|z:1|->|x:2|->|y:1|->|y:2|
+ *
+ * If a read occurs after the refresh (at the last published message
|y:2|), it ensures that outdated data like x=1
+ * is not obtained. However, it does not guarantee that the values will
always be x=2, y=2, z=1,
+ * as the table view may receive updates with newly published data.
+ *
+ * |x:0|->|y:0|->|z:0|->|x:1|->|z:1|->|x:2|->|y:1|->|y:2| -> |y:3|
+ *
+ * Both y=2 or y=3 are possible. Therefore, different readers may receive
different values,
+ * but all values will be equal to or newer than the data refreshed from
the last call to the refresh method.
+ */
+ CompletableFuture<Void> refreshAsync();
+
+ /**
+ * Refresh the table view with the latest data in the topic, ensuring that
all subsequent reads are based on
+ * the refreshed data.
+ *
+ * @throws PulsarClientException if there is any error refreshing the
table view.
+ */
+ void refresh() throws PulsarClientException;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index 64abd6d811b..d5d4174ee10 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -60,6 +60,26 @@ public class TableViewImpl<T> implements TableView<T> {
private final boolean isPersistentTopic;
private TopicCompactionStrategy<T> compactionStrategy;
+ /**
+ * Store the refresh tasks. When read to the position recording in the
right map,
+ * then remove the position in the right map. If the right map is empty,
complete the future in the left.
+ * There should be no timeout exception here, because the caller can only
retry for TimeoutException.
+ * It will only be completed exceptionally when no more messages can be
read.
+ */
+ private final ConcurrentHashMap<CompletableFuture<Void>, Map<String,
TopicMessageId>> pendingRefreshRequests;
+
+ /**
+ * This map stored the read position of each partition. It is used for the
following case:
+ * <p>
+ * 1. Get last message ID.
+ * 2. Receive message p1-1:1, p2-1:1, p2-1:2, p3-1:1
+ * 3. Receive response of step1 {|p1-1:1|p2-2:2|p3-3:6|}
+ * 4. No more messages are written to this topic.
+ * As a result, the refresh operation will never be completed.
+ * </p>
+ */
+ private final ConcurrentHashMap<String, MessageId> lastReadPositions;
+
TableViewImpl(PulsarClientImpl client, Schema<T> schema,
TableViewConfigurationData conf) {
this.conf = conf;
this.isPersistentTopic =
conf.getTopicName().startsWith(TopicDomain.persistent.toString());
@@ -69,6 +89,8 @@ public class TableViewImpl<T> implements TableView<T> {
this.listenersMutex = new ReentrantLock();
this.compactionStrategy =
TopicCompactionStrategy.load(TABLE_VIEW_TAG,
conf.getTopicCompactionStrategyClassName());
+ this.pendingRefreshRequests = new ConcurrentHashMap<>();
+ this.lastReadPositions = new ConcurrentHashMap<>();
ReaderBuilder<T> readerBuilder = client.newReader(schema)
.topic(conf.getTopicName())
.startMessageId(MessageId.earliest)
@@ -94,9 +116,10 @@ public class TableViewImpl<T> implements TableView<T> {
return reader.thenCompose((reader) -> {
if (!isPersistentTopic) {
readTailMessages(reader);
- return CompletableFuture.completedFuture(reader);
+ return CompletableFuture.completedFuture(null);
}
- return this.readAllExistingMessages(reader);
+ return this.readAllExistingMessages(reader)
+ .thenRun(() -> readTailMessages(reader));
}).thenApply(__ -> this);
}
@@ -180,6 +203,7 @@ public class TableViewImpl<T> implements TableView<T> {
}
private void handleMessage(Message<T> msg) {
+ lastReadPositions.put(msg.getTopicName(), msg.getMessageId());
try {
if (msg.hasKey()) {
String key = msg.getKey();
@@ -226,31 +250,104 @@ public class TableViewImpl<T> implements TableView<T> {
}
}
}
+ checkAllFreshTask(msg);
} finally {
msg.release();
}
}
- private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T>
reader) {
+ @Override
+ public CompletableFuture<Void> refreshAsync() {
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ reader.thenCompose(reader ->
getLastMessageIds(reader).thenAccept(lastMessageIds -> {
+ // After get the response of lastMessageIds, put the future and
result into `refreshMap`
+ // and then filter out partitions that has been read to the
lastMessageID.
+ pendingRefreshRequests.put(completableFuture, lastMessageIds);
+ filterReceivedMessages(lastMessageIds);
+ // If there is no new messages, the refresh operation could be
completed right now.
+ if (lastMessageIds.isEmpty()) {
+ pendingRefreshRequests.remove(completableFuture);
+ completableFuture.complete(null);
+ }
+ })).exceptionally(throwable -> {
+ completableFuture.completeExceptionally(throwable);
+ pendingRefreshRequests.remove(completableFuture);
+ return null;
+ });
+ return completableFuture;
+ }
+
+ @Override
+ public void refresh() throws PulsarClientException {
+ try {
+ refreshAsync().get();
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
+ }
+ }
+
+ private CompletableFuture<Void> readAllExistingMessages(Reader<T> reader) {
long startTime = System.nanoTime();
AtomicLong messagesRead = new AtomicLong();
- CompletableFuture<Reader<T>> future = new CompletableFuture<>();
- reader.getLastMessageIdsAsync().thenAccept(lastMessageIds -> {
- Map<String, TopicMessageId> maxMessageIds = new
ConcurrentHashMap<>();
- lastMessageIds.forEach(topicMessageId -> {
- maxMessageIds.put(topicMessageId.getOwnerTopic(),
topicMessageId);
- });
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ getLastMessageIds(reader).thenAccept(maxMessageIds -> {
readAllExistingMessages(reader, future, startTime, messagesRead,
maxMessageIds);
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
- future.thenAccept(__ -> readTailMessages(reader));
return future;
}
- private void readAllExistingMessages(Reader<T> reader,
CompletableFuture<Reader<T>> future, long startTime,
+ private CompletableFuture<Map<String, TopicMessageId>>
getLastMessageIds(Reader<T> reader) {
+ return reader.getLastMessageIdsAsync().thenApply(lastMessageIds -> {
+ Map<String, TopicMessageId> maxMessageIds = new
ConcurrentHashMap<>();
+ lastMessageIds.forEach(topicMessageId -> {
+ maxMessageIds.put(topicMessageId.getOwnerTopic(),
topicMessageId);
+ });
+ return maxMessageIds;
+ });
+ }
+
+ private void filterReceivedMessages(Map<String, TopicMessageId>
lastMessageIds) {
+ // The `lastMessageIds` and `readPositions` is concurrency-safe data
types.
+ lastMessageIds.forEach((partition, lastMessageId) -> {
+ MessageId messageId = lastReadPositions.get(partition);
+ if (messageId != null && lastMessageId.compareTo(messageId) <= 0) {
+ lastMessageIds.remove(partition);
+ }
+ });
+ }
+
+ private boolean checkFreshTask(Map<String, TopicMessageId> maxMessageIds,
CompletableFuture<Void> future,
+ MessageId messageId, String topicName) {
+ // The message received from multi-consumer/multi-reader is processed
to TopicMessageImpl.
+ TopicMessageId maxMessageId = maxMessageIds.get(topicName);
+ // We need remove the partition from the maxMessageIds map
+ // once the partition has been read completely.
+ if (maxMessageId != null && messageId.compareTo(maxMessageId) >= 0) {
+ maxMessageIds.remove(topicName);
+ }
+ if (maxMessageIds.isEmpty()) {
+ future.complete(null);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private void checkAllFreshTask(Message<T> msg) {
+ pendingRefreshRequests.forEach((future, maxMessageIds) -> {
+ String topicName = msg.getTopicName();
+ MessageId messageId = msg.getMessageId();
+ if (checkFreshTask(maxMessageIds, future, messageId, topicName)) {
+ pendingRefreshRequests.remove(future);
+ }
+ });
+ }
+
+ private void readAllExistingMessages(Reader<T> reader,
CompletableFuture<Void> future, long startTime,
AtomicLong messagesRead, Map<String,
TopicMessageId> maxMessageIds) {
reader.hasMessageAvailableAsync()
.thenAccept(hasMessage -> {
@@ -258,17 +355,12 @@ public class TableViewImpl<T> implements TableView<T> {
reader.readNextAsync()
.thenAccept(msg -> {
messagesRead.incrementAndGet();
- // We need remove the partition from the
maxMessageIds map
- // once the partition has been read
completely.
- TopicMessageId maxMessageId =
maxMessageIds.get(msg.getTopicName());
- if (maxMessageId != null &&
msg.getMessageId().compareTo(maxMessageId) >= 0) {
- maxMessageIds.remove(msg.getTopicName());
- }
+ String topicName = msg.getTopicName();
+ MessageId messageId = msg.getMessageId();
handleMessage(msg);
- if (maxMessageIds.isEmpty()) {
- future.complete(reader);
- } else {
- readAllExistingMessages(reader, future,
startTime, messagesRead, maxMessageIds);
+ if (!checkFreshTask(maxMessageIds, future,
messageId, topicName)) {
+ readAllExistingMessages(reader, future,
startTime,
+ messagesRead, maxMessageIds);
}
}).exceptionally(ex -> {
if (ex.getCause() instanceof
PulsarClientException.AlreadyClosedException) {
@@ -289,7 +381,7 @@ public class TableViewImpl<T> implements TableView<T> {
reader.getTopic(),
messagesRead,
durationMillis / 1000.0);
- future.complete(reader);
+ future.complete(null);
}
});
}
@@ -303,6 +395,11 @@ public class TableViewImpl<T> implements TableView<T> {
if (ex.getCause() instanceof
PulsarClientException.AlreadyClosedException) {
log.error("Reader {} was closed while reading tail
messages.",
reader.getTopic(), ex);
+ // Fail all refresh request when no more messages can
be read.
+ pendingRefreshRequests.keySet().forEach(future -> {
+ pendingRefreshRequests.remove(future);
+ future.completeExceptionally(ex);
+ });
} else {
// Retrying on the other exceptions such as
NotConnectedException
try {