This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b4d58579bfa [improve][client] Add unified newTableView method in
PulsarClient (#19048)
b4d58579bfa is described below
commit b4d58579bfa53dd89e3207e2264d0bc4f8dce3c6
Author: Ruguo Yu <[email protected]>
AuthorDate: Tue Jan 17 16:53:17 2023 +0800
[improve][client] Add unified newTableView method in PulsarClient (#19048)
---
.../apache/pulsar/client/impl/TableViewTest.java | 20 +++++++++-
.../org/apache/pulsar/client/api/PulsarClient.java | 46 +++++++++++++++++++++-
.../pulsar/client/impl/PulsarClientImpl.java | 14 +++++++
3 files changed, 78 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
index 9c1779839a2..b6569d6a21d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
@@ -173,6 +173,24 @@ public class TableViewTest extends
MockedPulsarServiceBaseTest {
}
}
+ @Test
+ public void testNewTableView() throws Exception {
+ String topic = "persistent://public/default/new-tableview-test";
+ admin.topics().createPartitionedTopic(topic, 2);
+ Set<String> keys = this.publishMessages(topic, 10, false);
+ @Cleanup
+ TableView<byte[]> tv = pulsarClient.newTableView()
+ .topic(topic)
+ .autoUpdatePartitionsInterval(60, TimeUnit.SECONDS)
+ .create();
+ tv.forEachAndListen((k, v) -> log.info("{} -> {}", k, new String(v)));
+ Awaitility.await().untilAsserted(() -> {
+ log.info("Current tv size: {}", tv.size());
+ assertEquals(tv.size(), 10);
+ });
+ assertEquals(tv.keySet(), keys);
+ }
+
@Test(timeOut = 30 * 1000, dataProvider = "topicDomain")
public void testTableViewUpdatePartitions(String topicDomain) throws
Exception {
String topic = topicDomain +
"://public/default/tableview-test-update-partitions";
@@ -242,7 +260,7 @@ public class TableViewTest extends
MockedPulsarServiceBaseTest {
tv.close();
@Cleanup
- TableView<String> tv1 = pulsarClient.newTableViewBuilder(Schema.STRING)
+ TableView<String> tv1 = pulsarClient.newTableView(Schema.STRING)
.topic(topic)
.autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
.create();
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index 05a21fa1585..90095300cca 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -230,7 +230,7 @@ public interface PulsarClient extends Closeable {
*
* <p>Example:
* <pre>{@code
- * TableView<byte[]> tableView = client.newTableView(Schema.BYTES)
+ * TableView<byte[]> tableView = client.newTableViewBuilder(Schema.BYTES)
* .topic("my-topic")
* .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
* .create();
@@ -240,9 +240,53 @@ public interface PulsarClient extends Closeable {
*
* @param schema provide a way to convert between serialized data and
domain objects
* @return a {@link TableViewBuilder} object to configure and construct
the {@link TableView} instance
+ * @deprecated Use {@link PulsarClient#newTableView(Schema)} to build and
configure a {@link TableViewBuilder}
+ * instance
*/
+ @Deprecated
<T> TableViewBuilder<T> newTableViewBuilder(Schema<T> schema);
+ /**
+ * Create a table view builder for subscribing on a specific topic.
+ *
+ * <p>The TableView provides a key-value map view of a compacted topic.
Messages without keys will
+ * be ignored.
+ *
+ * <p>Example:
+ * <pre>{@code
+ * TableView<byte[]> tableView = client.newTableView()
+ * .topic("my-topic")
+ * .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
+ * .create();
+ *
+ * tableView.forEach((k, v) -> System.out.println(k + ":" + v));
+ * }</pre>
+ *
+ * @return a {@link TableViewBuilder} object to configure and construct
the {@link TableView} instance
+ */
+ TableViewBuilder<byte[]> newTableView();
+
+ /**
+ * Create a table view builder with a specific schema for subscribing on a
specific topic.
+ *
+ * <p>The TableView provides a key-value map view of a compacted topic.
Messages without keys will
+ * be ignored.
+ *
+ * <p>Example:
+ * <pre>{@code
+ * TableView<byte[]> tableView = client.newTableView(Schema.BYTES)
+ * .topic("my-topic")
+ * .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
+ * .create();
+ *
+ * tableView.forEach((k, v) -> System.out.println(k + ":" + v));
+ * }</pre>
+ *
+ * @param schema provide a way to convert between serialized data and
domain objects
+ * @return a {@link TableViewBuilder} object to configure and construct
the {@link TableView} instance
+ */
+ <T> TableViewBuilder<T> newTableView(Schema<T> schema);
+
/**
* Update the service URL this client is using.
*
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index d964328d59c..eba7ff91f65 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -305,11 +305,25 @@ public class PulsarClientImpl implements PulsarClient {
return new ReaderBuilderImpl<>(this, schema);
}
+ /**
+ * @deprecated use {@link #newTableView(Schema)} instead.
+ */
@Override
+ @Deprecated
public <T> TableViewBuilder<T> newTableViewBuilder(Schema<T> schema) {
return new TableViewBuilderImpl<>(this, schema);
}
+ @Override
+ public TableViewBuilder<byte[]> newTableView() {
+ return new TableViewBuilderImpl<>(this, Schema.BYTES);
+ }
+
+ @Override
+ public <T> TableViewBuilder<T> newTableView(Schema<T> schema) {
+ return new TableViewBuilderImpl<>(this, schema);
+ }
+
public CompletableFuture<Producer<byte[]>>
createProducerAsync(ProducerConfigurationData conf) {
return createProducerAsync(conf, Schema.BYTES, null);
}