This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 51d11db  feat: Support table view for C client. (#294)
51d11db is described below

commit 51d11db0c6ed32f15ab06aca9a043ca44c9ecaea
Author: Baodi Shi <[email protected]>
AuthorDate: Fri Jul 7 16:22:35 2023 +0800

    feat: Support table view for C client. (#294)
---
 include/pulsar/TableView.h                  |   2 +-
 include/pulsar/c/client.h                   |  51 ++++++++
 include/pulsar/c/table_view.h               | 141 ++++++++++++++++++++++
 include/pulsar/c/table_view_configuration.h |  48 ++++++++
 lib/c/c_Client.cc                           |  28 +++++
 lib/c/c_TableView.cc                        |  89 ++++++++++++++
 lib/c/c_TableViewConfiguration.cc           |  46 ++++++++
 lib/c/c_structs.h                           |   8 ++
 tests/c/c_TableViewTest.cc                  | 176 ++++++++++++++++++++++++++++
 9 files changed, 588 insertions(+), 1 deletion(-)

diff --git a/include/pulsar/TableView.h b/include/pulsar/TableView.h
index 2c2898b..d0c278a 100644
--- a/include/pulsar/TableView.h
+++ b/include/pulsar/TableView.h
@@ -51,7 +51,7 @@ class PULSAR_PUBLIC TableView {
      * TableView view;
      * std::string value;
      * while (true) {
-     *     if (view.retrieveValue("key")) {
+     *     if (view.retrieveValue("key", value)) {
      *         std::cout << "value is updated to: " << value;
      *     } else {
      *         // sleep for a while or print the message that value is not 
updated
diff --git a/include/pulsar/c/client.h b/include/pulsar/c/client.h
index 3a53c01..5fdd287 100644
--- a/include/pulsar/c/client.h
+++ b/include/pulsar/c/client.h
@@ -30,6 +30,8 @@
 #include <pulsar/c/reader_configuration.h>
 #include <pulsar/c/result.h>
 #include <pulsar/c/string_list.h>
+#include <pulsar/c/table_view.h>
+#include <pulsar/c/table_view_configuration.h>
 #include <pulsar/defines.h>
 
 #ifdef __cplusplus
@@ -47,6 +49,7 @@ typedef void (*pulsar_create_producer_callback)(pulsar_result 
result, pulsar_pro
 
 typedef void (*pulsar_subscribe_callback)(pulsar_result result, 
pulsar_consumer_t *consumer, void *ctx);
 typedef void (*pulsar_reader_callback)(pulsar_result result, pulsar_reader_t 
*reader, void *ctx);
+typedef void (*pulsar_table_view_callback)(pulsar_result result, 
pulsar_table_view_t *tableView, void *ctx);
 typedef void (*pulsar_get_partitions_callback)(pulsar_result result, 
pulsar_string_list_t *partitions,
                                                void *ctx);
 
@@ -172,6 +175,54 @@ PULSAR_PUBLIC void 
pulsar_client_create_reader_async(pulsar_client_t *client, co
                                                      const pulsar_message_id_t 
*startMessageId,
                                                      
pulsar_reader_configuration_t *conf,
                                                      pulsar_reader_callback 
callback, void *ctx);
+/**
+ * Create a table view with given {@code table_view_configuration} for 
specified topic.
+ *
+ * The TableView provides a key-value map view of a compacted topic. Messages 
without keys will
+ * be ignored.
+ *
+ * NOTE:
+ * When the result in the callback is `ResultOk`, `*c_tableView` will point to 
the memory that
+ * is allocated internally. You have to call `pulsar_table_view_free` to free 
it.
+ *
+ * Example:
+ * ```c
+ * pulsar_table_view_configuration_t *table_view_conf = 
pulsar_table_view_configuration_create();
+ * pulsar_table_view_configuration_set_subscription_name(table_view_conf, 
sub_name);
+ * pulsar_table_view_t *table_view;
+ * pulsar_result result = pulsar_client_create_table_view(client, topic_name, 
table_view_conf, &table_view);
+ *
+ * // do something...
+ *
+ * pulsar_table_view_close(table_view);
+ * pulsar_table_view_free(table_view);
+ * pulsar_table_view_configuration_free(table_view_conf);
+ *
+ * ```
+ *
+ * @param topic The name of the topic.
+ * @param conf The {@code table_view_configuration} pointer.
+ * @param c_tableView The pointer of the table_view pointer
+ * @return Returned when the table_view is successfully linked to the topic 
and the map is built from a
+ * message that already exists.
+ */
+PULSAR_PUBLIC pulsar_result pulsar_client_create_table_view(pulsar_client_t 
*client, const char *topic,
+                                                            
pulsar_table_view_configuration_t *conf,
+                                                            
pulsar_table_view_t **c_tableView);
+
+/**
+ * Async Create a table view with given {@code table_view_configuration} for 
specified topic.
+ * @param topic The name of the topic.
+ * @param conf The {@code table_view_configuration} pointer.
+ * @param callback
+ * 1. When the result in the callback is `ResultOk`, `tableView` in the 
callback will point to the memory that
+ * is allocated internally. You have to call `pulsar_table_view_free` to free 
it.
+ * 2. If the result in the callback is not `ResultOk`, `tableView` in the 
callback will be nullptr.
+ * @param ctx
+ */
+PULSAR_PUBLIC void pulsar_client_create_table_view_async(pulsar_client_t 
*client, const char *topic,
+                                                         
pulsar_table_view_configuration_t *conf,
+                                                         
pulsar_table_view_callback callback, void *ctx);
 
 PULSAR_PUBLIC pulsar_result pulsar_client_get_topic_partitions(pulsar_client_t 
*client, const char *topic,
                                                                
pulsar_string_list_t **partitions);
diff --git a/include/pulsar/c/table_view.h b/include/pulsar/c/table_view.h
new file mode 100644
index 0000000..87eed3e
--- /dev/null
+++ b/include/pulsar/c/table_view.h
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <pulsar/defines.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <pulsar/c/message.h>
+#include <pulsar/c/messages.h>
+#include <pulsar/c/result.h>
+#include <stdint.h>
+
+typedef struct _pulsar_table_view pulsar_table_view_t;
+
+typedef void (*pulsar_table_view_action)(const char *key, const void *value, 
size_t value_size, void *ctx);
+typedef void (*pulsar_result_callback)(pulsar_result, void *);
+
+/**
+ * Move the latest value associated with the key.
+ *
+ * NOTE:
+ * 1. Once the value has been retrieved successfully,
+ * the associated value will be removed from the table view until next time 
the value is updated.
+ * 2. Once the value has been retrieved successfully, `*value` will point to 
the memory that is allocated
+ * internally. You have to call `free(value)` to free it.
+ *
+ * Example:
+ *
+ * ```c
+ * pulsar_table_view_t *table_view;
+ * void* value;
+ * size_t value_size;
+ * while (true) {
+ *     if (pulsar_table_view_retrieve_value(table_view, "key", &value, 
&value_size)) {
+ *         for (size_t i = 0; i < value_size; i++) {
+ *             printf("0x%02x%c", ((char*) value)[i], (i + 1 == value_size) ? 
'\n': ' ');
+ *         }
+ *     } else {
+ *         // sleep for a while or print the message that value is not updated
+ *     }
+ * }
+ * free(value);
+ * ```
+ *
+ * @param table_view
+ * @param key
+ * @param value the value associated with the key
+ * @return true if there is an associated value of the key, otherwise false
+ */
+PULSAR_PUBLIC bool pulsar_table_view_retrieve_value(pulsar_table_view_t 
*table_view, const char *key,
+                                                    void **value, size_t 
*value_size);
+
+/**
+ * It's similar with `pulsar_table_view_retrieve_value` except the associated 
value not will be removed from
+ * the table view.
+ *
+ * NOTE:
+ * Once the value has been get successfully, `*value` will point to the memory 
that is allocated internally.
+ * You have to call `free(value)` to free it.
+ *
+ * @param table_view
+ * @param key
+ * @param value the value associated with the key
+ * @return true if there is an associated value of the key, otherwise false
+ */
+PULSAR_PUBLIC bool pulsar_table_view_get_value(pulsar_table_view_t 
*table_view, const char *key, void **value,
+                                               size_t *value_size);
+
+/**
+ * Check if the key exists in the table view.
+ * @param table_view
+ * @param key
+ * @return true if the key exists in the table view
+ */
+PULSAR_PUBLIC bool pulsar_table_view_contain_key(pulsar_table_view_t 
*table_view, const char *key);
+
+/**
+ * Get the size of the elements.
+ * @param table_view
+ * @return
+ */
+PULSAR_PUBLIC int pulsar_table_view_size(pulsar_table_view_t *table_view);
+
+/**
+ * Performs the given action for each entry in this map until all entries have 
been processed or the
+ * action throws an exception.
+ */
+PULSAR_PUBLIC void pulsar_table_view_for_each(pulsar_table_view_t *table_view,
+                                              pulsar_table_view_action action, 
void *ctx);
+
+/**
+ * Performs the given action for each entry in this map until all entries have 
been processed and
+ * register the callback, which will be called each time a key-value pair is 
updated.
+ */
+PULSAR_PUBLIC void pulsar_table_view_for_each_add_listen(pulsar_table_view_t 
*table_view,
+                                                         
pulsar_table_view_action action, void *ctx);
+
+/**
+ * Free the table view.
+ * @param table_view
+ */
+PULSAR_PUBLIC void pulsar_table_view_free(pulsar_table_view_t *table_view);
+
+/**
+ * Close the table view and stop the broker to push more messages
+ * @param table_view
+ * @return
+ */
+PULSAR_PUBLIC pulsar_result pulsar_table_view_close(pulsar_table_view_t 
*table_view);
+
+/**
+ * Async close the table view and stop the broker to push more messages
+ * @param table_view
+ * @param callback
+ * @param ctx
+ */
+PULSAR_PUBLIC void pulsar_table_view_close_async(pulsar_table_view_t 
*table_view,
+                                                 pulsar_result_callback 
callback, void *ctx);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/include/pulsar/c/table_view_configuration.h 
b/include/pulsar/c/table_view_configuration.h
new file mode 100644
index 0000000..26389b6
--- /dev/null
+++ b/include/pulsar/c/table_view_configuration.h
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <pulsar/defines.h>
+
+#include "producer_configuration.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_table_view_configuration 
pulsar_table_view_configuration_t;
+
+PULSAR_PUBLIC pulsar_table_view_configuration_t 
*pulsar_table_view_configuration_create();
+
+PULSAR_PUBLIC void 
pulsar_table_view_configuration_free(pulsar_table_view_configuration_t *conf);
+
+PULSAR_PUBLIC void pulsar_table_view_configuration_set_schema_info(
+    pulsar_table_view_configuration_t *table_view_configuration_t, 
pulsar_schema_type schemaType,
+    const char *name, const char *schema, pulsar_string_map_t *properties);
+
+PULSAR_PUBLIC void pulsar_table_view_configuration_set_subscription_name(
+    pulsar_table_view_configuration_t *table_view_configuration_t, const char 
*subscription_name);
+
+PULSAR_PUBLIC const char 
*pulsar_table_view_configuration_get_subscription_name(
+    pulsar_table_view_configuration_t *table_view_configuration_t);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/lib/c/c_Client.cc b/lib/c/c_Client.cc
index dd1b71e..9f27538 100644
--- a/lib/c/c_Client.cc
+++ b/lib/c/c_Client.cc
@@ -192,6 +192,34 @@ void pulsar_client_create_reader_async(pulsar_client_t 
*client, const char *topi
         std::bind(&handle_reader_callback, std::placeholders::_1, 
std::placeholders::_2, callback, ctx));
 }
 
+pulsar_result pulsar_client_create_table_view(pulsar_client_t *client, const 
char *topic,
+                                              
pulsar_table_view_configuration_t *conf,
+                                              pulsar_table_view_t 
**c_tableView) {
+    pulsar::TableView tableView;
+    pulsar::Result res = client->client->createTableView(topic, 
conf->tableViewConfiguration, tableView);
+    if (res == pulsar::ResultOk) {
+        (*c_tableView) = new pulsar_table_view_t;
+        (*c_tableView)->tableView = std::move(tableView);
+        return pulsar_result_Ok;
+    }
+    return (pulsar_result)res;
+}
+
+void pulsar_client_create_table_view_async(pulsar_client_t *client, const char 
*topic,
+                                           pulsar_table_view_configuration_t 
*conf,
+                                           pulsar_table_view_callback 
callback, void *ctx) {
+    client->client->createTableViewAsync(topic, conf->tableViewConfiguration,
+                                         [callback, ctx](pulsar::Result 
result, pulsar::TableView tableView) {
+                                             if (result == pulsar::ResultOk) {
+                                                 auto *c_tableView = new 
pulsar_table_view_t;
+                                                 c_tableView->tableView = 
std::move(tableView);
+                                                 
callback((pulsar_result)result, c_tableView, ctx);
+                                             } else {
+                                                 
callback((pulsar_result)result, NULL, ctx);
+                                             }
+                                         });
+}
+
 pulsar_result pulsar_client_get_topic_partitions(pulsar_client_t *client, 
const char *topic,
                                                  pulsar_string_list_t 
**partitions) {
     std::vector<std::string> partitionsList;
diff --git a/lib/c/c_TableView.cc b/lib/c/c_TableView.cc
new file mode 100644
index 0000000..520b055
--- /dev/null
+++ b/lib/c/c_TableView.cc
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/table_view.h>
+#include <string.h>
+
+#include "c_structs.h"
+
+static void *malloc_and_copy(const char *s, size_t slen) {
+    void *result = (void *)malloc(slen);
+    if (result == NULL) {
+        abort();
+    }
+    memcpy(result, s, slen);
+    return result;
+}
+
+bool pulsar_table_view_retrieve_value(pulsar_table_view_t *table_view, const 
char *key, void **value,
+                                      size_t *value_size) {
+    std::string v;
+    bool result = table_view->tableView.retrieveValue(key, v);
+    if (result) {
+        *value = malloc_and_copy(v.c_str(), v.size());
+        *value_size = v.size();
+    }
+    return result;
+}
+
+bool pulsar_table_view_get_value(pulsar_table_view_t *table_view, const char 
*key, void **value,
+                                 size_t *value_size) {
+    std::string v;
+    bool result = table_view->tableView.getValue(key, v);
+    if (result) {
+        *value = malloc_and_copy(v.c_str(), v.size());
+        *value_size = v.size();
+    }
+    return result;
+}
+
+bool pulsar_table_view_contain_key(pulsar_table_view_t *table_view, const char 
*key) {
+    return table_view->tableView.containsKey(key);
+}
+
+int pulsar_table_view_size(pulsar_table_view_t *table_view) { return 
table_view->tableView.size(); }
+
+void pulsar_table_view_for_each(pulsar_table_view_t *table_view, 
pulsar_table_view_action action, void *ctx) {
+    table_view->tableView.forEach([action, ctx](const std::string &key, const 
std::string &value) {
+        if (action) {
+            action(key.c_str(), value.c_str(), value.size(), ctx);
+        }
+    });
+}
+
+void pulsar_table_view_for_each_add_listen(pulsar_table_view_t *table_view, 
pulsar_table_view_action action,
+                                           void *ctx) {
+    table_view->tableView.forEachAndListen([action, ctx](const std::string 
&key, const std::string &value) {
+        if (action) {
+            action(key.c_str(), value.c_str(), value.size(), ctx);
+        }
+    });
+}
+
+void pulsar_table_view_free(pulsar_table_view_t *table_view) { delete 
table_view; }
+
+pulsar_result pulsar_table_view_close(pulsar_table_view_t *table_view) {
+    return (pulsar_result)table_view->tableView.close();
+}
+
+void pulsar_table_view_close_async(pulsar_table_view_t *table_view, 
pulsar_result_callback callback,
+                                   void *ctx) {
+    table_view->tableView.closeAsync(
+        [callback, ctx](pulsar::Result result) { return 
handle_result_callback(result, callback, ctx); });
+}
diff --git a/lib/c/c_TableViewConfiguration.cc 
b/lib/c/c_TableViewConfiguration.cc
new file mode 100644
index 0000000..3f8132a
--- /dev/null
+++ b/lib/c/c_TableViewConfiguration.cc
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/c/table_view_configuration.h>
+
+#include "c_structs.h"
+
+pulsar_table_view_configuration_t *pulsar_table_view_configuration_create() {
+    auto *table_view_configuration_t = new pulsar_table_view_configuration_t;
+    table_view_configuration_t->tableViewConfiguration = 
pulsar::TableViewConfiguration();
+    return table_view_configuration_t;
+}
+
+void pulsar_table_view_configuration_free(pulsar_table_view_configuration_t 
*conf) { delete conf; }
+
+void pulsar_table_view_configuration_set_schema_info(
+    pulsar_table_view_configuration_t *table_view_configuration_t, 
pulsar_schema_type schemaType,
+    const char *name, const char *schema, pulsar_string_map_t *properties) {
+    auto schemaInfo = pulsar::SchemaInfo((pulsar::SchemaType)schemaType, name, 
schema, properties->map);
+    table_view_configuration_t->tableViewConfiguration.schemaInfo = schemaInfo;
+}
+
+void pulsar_table_view_configuration_set_subscription_name(
+    pulsar_table_view_configuration_t *table_view_configuration_t, const char 
*subscription_name) {
+    table_view_configuration_t->tableViewConfiguration.subscriptionName = 
subscription_name;
+}
+
+const char *pulsar_table_view_configuration_get_subscription_name(
+    pulsar_table_view_configuration_t *table_view_configuration_t) {
+    return 
table_view_configuration_t->tableViewConfiguration.subscriptionName.c_str();
+}
diff --git a/lib/c/c_structs.h b/lib/c/c_structs.h
index 1e9c376..6f15998 100644
--- a/lib/c/c_structs.h
+++ b/lib/c/c_structs.h
@@ -48,6 +48,14 @@ struct _pulsar_consumer_configuration {
     pulsar::ConsumerConfiguration consumerConfiguration;
 };
 
+struct _pulsar_table_view {
+    pulsar::TableView tableView;
+};
+
+struct _pulsar_table_view_configuration {
+    pulsar::TableViewConfiguration tableViewConfiguration;
+};
+
 struct _pulsar_reader {
     pulsar::Reader reader;
 };
diff --git a/tests/c/c_TableViewTest.cc b/tests/c/c_TableViewTest.cc
new file mode 100644
index 0000000..4b7c532
--- /dev/null
+++ b/tests/c/c_TableViewTest.cc
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/c/client.h>
+#include <pulsar/c/table_view.h>
+#include <string.h>
+
+#include <future>
+
+static const char *lookup_url = "pulsar://localhost:6650";
+
+struct tv_create_result {
+    pulsar_result result;
+    pulsar_table_view_t *tableView;
+};
+
+static void create_tv_callback(pulsar_result result, pulsar_table_view_t 
*tableView, void *ctx) {
+    std::promise<tv_create_result> *create_promise = 
(std::promise<tv_create_result> *)ctx;
+    create_promise->set_value({result, tableView});
+}
+
+TEST(c_TableViewTest, testCreateTableViewAsync) {
+    const char *topic_name = 
"persistent://public/default/test-create-tv-async";
+    pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
+    pulsar_client_t *client = pulsar_client_create(lookup_url, conf);
+
+    // Create table view.
+    pulsar_table_view_configuration_t *table_view_conf = 
pulsar_table_view_configuration_create();
+    pulsar_table_view_configuration_set_subscription_name(table_view_conf, 
"sub-name");
+    std::promise<tv_create_result> create_promise;
+    std::future<tv_create_result> create_future = create_promise.get_future();
+    pulsar_client_create_table_view_async(client, topic_name, table_view_conf, 
create_tv_callback,
+                                          &create_promise);
+    tv_create_result tvResult = create_future.get();
+    ASSERT_EQ(pulsar_result_Ok, tvResult.result);
+
+    pulsar_table_view_free(tvResult.tableView);
+    pulsar_client_close(client);
+}
+
+struct tv_action_ctx {
+    char *expect_data;
+    int size;
+    int expect_size;
+    std::promise<bool> *listen_promise;
+};
+
+static void tv_action(const char *key, const void *value, size_t value_size, 
void *ctx) {
+    tv_action_ctx *context = (tv_action_ctx *)ctx;
+    context->size++;
+    ASSERT_EQ(memcmp(value, context->expect_data, value_size), 0);
+    if (context->size == context->expect_size) {
+        context->listen_promise->set_value(true);
+    }
+}
+
+TEST(c_TableViewTest, testSimpleTableView) {
+    srand(time(NULL));
+    char topic_name[64];
+    snprintf(topic_name, 64, "persistent://public/default/test-table-view-%d", 
rand());
+    const char *sub_name = "my-sub-name";
+
+    pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
+    pulsar_client_t *client = pulsar_client_create(lookup_url, conf);
+
+    pulsar_producer_configuration_t *producer_conf = 
pulsar_producer_configuration_create();
+    pulsar_producer_t *producer;
+    pulsar_result result = pulsar_client_create_producer(client, topic_name, 
producer_conf, &producer);
+    ASSERT_EQ(pulsar_result_Ok, result);
+
+    // Send messages
+    const int num = 10;
+    const char *key1 = "key1";
+    const char *key2 = "key2";
+    size_t data_size = 4;
+    char *data = (char *)malloc(data_size);
+    data[0] = 0x01;
+    data[1] = 0x00;
+    data[2] = 0x02;
+    data[3] = 0x00;
+    for (int i = 0; i < num; i++) {
+        pulsar_message_t *message = pulsar_message_create();
+        if (i % 2 == 0) {
+            pulsar_message_set_partition_key(message, key1);
+        } else {
+            pulsar_message_set_partition_key(message, key2);
+        }
+        pulsar_message_set_content(message, data, data_size);
+        pulsar_result res = pulsar_producer_send(producer, message);
+        ASSERT_EQ(pulsar_result_Ok, res);
+        pulsar_message_free(message);
+    }
+
+    // Create table view.
+    pulsar_table_view_configuration_t *table_view_conf = 
pulsar_table_view_configuration_create();
+    pulsar_table_view_configuration_set_subscription_name(table_view_conf, 
sub_name);
+    pulsar_table_view_t *table_view;
+    result = pulsar_client_create_table_view(client, topic_name, 
table_view_conf, &table_view);
+    ASSERT_EQ(pulsar_result_Ok, result);
+
+    // test get value
+    void *v1;
+    size_t v1_size;
+    ASSERT_EQ(pulsar_table_view_size(table_view), 2);
+    ASSERT_TRUE(pulsar_table_view_get_value(table_view, "key1", &v1, 
&v1_size));
+    ASSERT_EQ(v1_size, data_size);
+    ASSERT_EQ(memcmp(v1, data, data_size), 0);
+    free(v1);
+
+    // test for each.
+    tv_action_ctx ctx;
+    ctx.expect_data = data;
+    ctx.size = 0;
+    pulsar_table_view_for_each(table_view, tv_action, &ctx);
+    ASSERT_EQ(ctx.size, 2);
+
+    // test for each and listen
+    std::promise<bool> listen_promise;
+    std::future<bool> listen_future = listen_promise.get_future();
+    tv_action_ctx ctx2;
+    ctx2.expect_data = data;
+    ctx2.size = 0;
+    ctx2.expect_size = 3;
+    ctx2.listen_promise = &listen_promise;
+    pulsar_table_view_for_each_add_listen(table_view, tv_action, &ctx2);
+    ASSERT_EQ(ctx.size, 2);
+    // send more message.
+    pulsar_message_t *message = pulsar_message_create();
+    pulsar_message_set_partition_key(message, "key3");
+    pulsar_message_set_content(message, data, data_size);
+    pulsar_result res = pulsar_producer_send(producer, message);
+    ASSERT_EQ(pulsar_result_Ok, res);
+    pulsar_message_free(message);
+    // wait for message.
+    ASSERT_TRUE(listen_future.get());
+    ASSERT_EQ(ctx2.size, ctx2.expect_size);
+
+    // test retrieve value
+    void *v2;
+    size_t v2_size;
+    ASSERT_TRUE(pulsar_table_view_retrieve_value(table_view, "key2", &v2, 
&v2_size));
+    ASSERT_EQ(v2_size, data_size);
+    ASSERT_EQ(memcmp(v2, data, data_size), 0);
+    free(v2);
+
+    // test table view size
+    ASSERT_FALSE(pulsar_table_view_contain_key(table_view, "key2"));
+    ASSERT_EQ(pulsar_table_view_size(table_view), 2);
+
+    free(data);
+    pulsar_producer_close(producer);
+    pulsar_table_view_close(table_view);
+    pulsar_client_close(client);
+    pulsar_table_view_free(table_view);
+    pulsar_table_view_configuration_free(table_view_conf);
+    pulsar_producer_free(producer);
+    pulsar_producer_configuration_free(producer_conf);
+    pulsar_client_free(client);
+    pulsar_client_configuration_free(conf);
+}

Reply via email to