This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b4d58579bfa [improve][client] Add unified newTableView method in 
PulsarClient (#19048)
b4d58579bfa is described below

commit b4d58579bfa53dd89e3207e2264d0bc4f8dce3c6
Author: Ruguo Yu <[email protected]>
AuthorDate: Tue Jan 17 16:53:17 2023 +0800

    [improve][client] Add unified newTableView method in PulsarClient (#19048)
---
 .../apache/pulsar/client/impl/TableViewTest.java   | 20 +++++++++-
 .../org/apache/pulsar/client/api/PulsarClient.java | 46 +++++++++++++++++++++-
 .../pulsar/client/impl/PulsarClientImpl.java       | 14 +++++++
 3 files changed, 78 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
index 9c1779839a2..b6569d6a21d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
@@ -173,6 +173,24 @@ public class TableViewTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test
+    public void testNewTableView() throws Exception {
+        String topic = "persistent://public/default/new-tableview-test";
+        admin.topics().createPartitionedTopic(topic, 2);
+        Set<String> keys = this.publishMessages(topic, 10, false);
+        @Cleanup
+        TableView<byte[]> tv = pulsarClient.newTableView()
+                .topic(topic)
+                .autoUpdatePartitionsInterval(60, TimeUnit.SECONDS)
+                .create();
+        tv.forEachAndListen((k, v) -> log.info("{} -> {}", k, new String(v)));
+        Awaitility.await().untilAsserted(() -> {
+            log.info("Current tv size: {}", tv.size());
+            assertEquals(tv.size(), 10);
+        });
+        assertEquals(tv.keySet(), keys);
+    }
+
     @Test(timeOut = 30 * 1000, dataProvider = "topicDomain")
     public void testTableViewUpdatePartitions(String topicDomain) throws 
Exception {
         String topic = topicDomain + 
"://public/default/tableview-test-update-partitions";
@@ -242,7 +260,7 @@ public class TableViewTest extends 
MockedPulsarServiceBaseTest {
         tv.close();
 
         @Cleanup
-        TableView<String> tv1 = pulsarClient.newTableViewBuilder(Schema.STRING)
+        TableView<String> tv1 = pulsarClient.newTableView(Schema.STRING)
                 .topic(topic)
                 .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
                 .create();
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index 05a21fa1585..90095300cca 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -230,7 +230,7 @@ public interface PulsarClient extends Closeable {
      *
      * <p>Example:
      * <pre>{@code
-     *  TableView<byte[]> tableView = client.newTableView(Schema.BYTES)
+     *  TableView<byte[]> tableView = client.newTableViewBuilder(Schema.BYTES)
      *            .topic("my-topic")
      *            .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
      *            .create();
@@ -240,9 +240,53 @@ public interface PulsarClient extends Closeable {
      *
      * @param schema provide a way to convert between serialized data and 
domain objects
      * @return a {@link TableViewBuilder} object to configure and construct 
the {@link TableView} instance
+     * @deprecated Use {@link PulsarClient#newTableView(Schema)} to build and 
configure a {@link TableViewBuilder}
+     * instance
      */
+    @Deprecated
     <T> TableViewBuilder<T> newTableViewBuilder(Schema<T> schema);
 
+    /**
+     * Create a table view builder for subscribing on a specific topic.
+     *
+     * <p>The TableView provides a key-value map view of a compacted topic. 
Messages without keys will
+     * be ignored.
+     *
+     * <p>Example:
+     * <pre>{@code
+     *  TableView<byte[]> tableView = client.newTableView()
+     *            .topic("my-topic")
+     *            .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
+     *            .create();
+     *
+     *  tableView.forEach((k, v) -> System.out.println(k + ":" + v));
+     * }</pre>
+     *
+     * @return a {@link TableViewBuilder} object to configure and construct 
the {@link TableView} instance
+     */
+    TableViewBuilder<byte[]> newTableView();
+
+    /**
+     * Create a table view builder with a specific schema for subscribing on a 
specific topic.
+     *
+     * <p>The TableView provides a key-value map view of a compacted topic. 
Messages without keys will
+     * be ignored.
+     *
+     * <p>Example:
+     * <pre>{@code
+     *  TableView<byte[]> tableView = client.newTableView(Schema.BYTES)
+     *            .topic("my-topic")
+     *            .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
+     *            .create();
+     *
+     *  tableView.forEach((k, v) -> System.out.println(k + ":" + v));
+     * }</pre>
+     *
+     * @param schema provide a way to convert between serialized data and 
domain objects
+     * @return a {@link TableViewBuilder} object to configure and construct 
the {@link TableView} instance
+     */
+    <T> TableViewBuilder<T> newTableView(Schema<T> schema);
+
     /**
      * Update the service URL this client is using.
      *
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index d964328d59c..eba7ff91f65 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -305,11 +305,25 @@ public class PulsarClientImpl implements PulsarClient {
         return new ReaderBuilderImpl<>(this, schema);
     }
 
+    /**
+     * @deprecated use {@link #newTableView(Schema)} instead.
+     */
     @Override
+    @Deprecated
     public <T> TableViewBuilder<T> newTableViewBuilder(Schema<T> schema) {
         return new TableViewBuilderImpl<>(this, schema);
     }
 
+    @Override
+    public TableViewBuilder<byte[]> newTableView() {
+        return new TableViewBuilderImpl<>(this, Schema.BYTES);
+    }
+
+    @Override
+    public <T> TableViewBuilder<T> newTableView(Schema<T> schema) {
+        return new TableViewBuilderImpl<>(this, schema);
+    }
+
     public CompletableFuture<Producer<byte[]>> 
createProducerAsync(ProducerConfigurationData conf) {
         return createProducerAsync(conf, Schema.BYTES, null);
     }

Reply via email to