This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 51d11db feat: Support table view for C client. (#294)
51d11db is described below
commit 51d11db0c6ed32f15ab06aca9a043ca44c9ecaea
Author: Baodi Shi <[email protected]>
AuthorDate: Fri Jul 7 16:22:35 2023 +0800
feat: Support table view for C client. (#294)
---
include/pulsar/TableView.h | 2 +-
include/pulsar/c/client.h | 51 ++++++++
include/pulsar/c/table_view.h | 141 ++++++++++++++++++++++
include/pulsar/c/table_view_configuration.h | 48 ++++++++
lib/c/c_Client.cc | 28 +++++
lib/c/c_TableView.cc | 89 ++++++++++++++
lib/c/c_TableViewConfiguration.cc | 46 ++++++++
lib/c/c_structs.h | 8 ++
tests/c/c_TableViewTest.cc | 176 ++++++++++++++++++++++++++++
9 files changed, 588 insertions(+), 1 deletion(-)
diff --git a/include/pulsar/TableView.h b/include/pulsar/TableView.h
index 2c2898b..d0c278a 100644
--- a/include/pulsar/TableView.h
+++ b/include/pulsar/TableView.h
@@ -51,7 +51,7 @@ class PULSAR_PUBLIC TableView {
* TableView view;
* std::string value;
* while (true) {
- * if (view.retrieveValue("key")) {
+ * if (view.retrieveValue("key", value)) {
* std::cout << "value is updated to: " << value;
* } else {
* // sleep for a while or print the message that value is not
updated
diff --git a/include/pulsar/c/client.h b/include/pulsar/c/client.h
index 3a53c01..5fdd287 100644
--- a/include/pulsar/c/client.h
+++ b/include/pulsar/c/client.h
@@ -30,6 +30,8 @@
#include <pulsar/c/reader_configuration.h>
#include <pulsar/c/result.h>
#include <pulsar/c/string_list.h>
+#include <pulsar/c/table_view.h>
+#include <pulsar/c/table_view_configuration.h>
#include <pulsar/defines.h>
#ifdef __cplusplus
@@ -47,6 +49,7 @@ typedef void (*pulsar_create_producer_callback)(pulsar_result
result, pulsar_pro
typedef void (*pulsar_subscribe_callback)(pulsar_result result,
pulsar_consumer_t *consumer, void *ctx);
typedef void (*pulsar_reader_callback)(pulsar_result result, pulsar_reader_t
*reader, void *ctx);
+typedef void (*pulsar_table_view_callback)(pulsar_result result,
pulsar_table_view_t *tableView, void *ctx);
typedef void (*pulsar_get_partitions_callback)(pulsar_result result,
pulsar_string_list_t *partitions,
void *ctx);
@@ -172,6 +175,54 @@ PULSAR_PUBLIC void
pulsar_client_create_reader_async(pulsar_client_t *client, co
const pulsar_message_id_t
*startMessageId,
pulsar_reader_configuration_t *conf,
pulsar_reader_callback
callback, void *ctx);
+/**
+ * Create a table view with given {@code table_view_configuration} for
specified topic.
+ *
+ * The TableView provides a key-value map view of a compacted topic. Messages
without keys will
+ * be ignored.
+ *
+ * NOTE:
+ * When the result in the callback is `ResultOk`, `*c_tableView` will point to
the memory that
+ * is allocated internally. You have to call `pulsar_table_view_free` to free
it.
+ *
+ * Example:
+ * ```c
+ * pulsar_table_view_configuration_t *table_view_conf =
pulsar_table_view_configuration_create();
+ * pulsar_table_view_configuration_set_subscription_name(table_view_conf,
sub_name);
+ * pulsar_table_view_t *table_view;
+ * pulsar_result result = pulsar_client_create_table_view(client, topic_name,
table_view_conf, &table_view);
+ *
+ * // do something...
+ *
+ * pulsar_table_view_close(table_view);
+ * pulsar_table_view_free(table_view);
+ * pulsar_table_view_configuration_free(table_view_conf);
+ *
+ * ```
+ *
+ * @param topic The name of the topic.
+ * @param conf The {@code table_view_configuration} pointer.
+ * @param c_tableView The pointer of the table_view pointer
+ * @return Returned when the table_view is successfully linked to the topic
and the map is built from a
+ * message that already exists.
+ */
+PULSAR_PUBLIC pulsar_result pulsar_client_create_table_view(pulsar_client_t
*client, const char *topic,
+
pulsar_table_view_configuration_t *conf,
+
pulsar_table_view_t **c_tableView);
+
+/**
+ * Async Create a table view with given {@code table_view_configuration} for
specified topic.
+ * @param topic The name of the topic.
+ * @param conf The {@code table_view_configuration} pointer.
+ * @param callback
+ * 1. When the result in the callback is `ResultOk`, `tableView` in the
callback will point to the memory that
+ * is allocated internally. You have to call `pulsar_table_view_free` to free
it.
+ * 2. If the result in the callback is not `ResultOk`, `tableView` in the
callback will be nullptr.
+ * @param ctx
+ */
+PULSAR_PUBLIC void pulsar_client_create_table_view_async(pulsar_client_t
*client, const char *topic,
+
pulsar_table_view_configuration_t *conf,
+
pulsar_table_view_callback callback, void *ctx);
PULSAR_PUBLIC pulsar_result pulsar_client_get_topic_partitions(pulsar_client_t
*client, const char *topic,
pulsar_string_list_t **partitions);
diff --git a/include/pulsar/c/table_view.h b/include/pulsar/c/table_view.h
new file mode 100644
index 0000000..87eed3e
--- /dev/null
+++ b/include/pulsar/c/table_view.h
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <pulsar/defines.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <pulsar/c/message.h>
+#include <pulsar/c/messages.h>
+#include <pulsar/c/result.h>
+#include <stdint.h>
+
+typedef struct _pulsar_table_view pulsar_table_view_t;
+
+typedef void (*pulsar_table_view_action)(const char *key, const void *value,
size_t value_size, void *ctx);
+typedef void (*pulsar_result_callback)(pulsar_result, void *);
+
+/**
+ * Move the latest value associated with the key.
+ *
+ * NOTE:
+ * 1. Once the value has been retrieved successfully,
+ * the associated value will be removed from the table view until next time
the value is updated.
+ * 2. Once the value has been retrieved successfully, `*value` will point to
the memory that is allocated
+ * internally. You have to call `free(value)` to free it.
+ *
+ * Example:
+ *
+ * ```c
+ * pulsar_table_view_t *table_view;
+ * void* value;
+ * size_t value_size;
+ * while (true) {
+ * if (pulsar_table_view_retrieve_value(table_view, "key", &value,
&value_size)) {
+ * for (size_t i = 0; i < value_size; i++) {
+ * printf("0x%02x%c", ((char*) value)[i], (i + 1 == value_size) ?
'\n': ' ');
+ * }
+ * } else {
+ * // sleep for a while or print the message that value is not updated
+ * }
+ * }
+ * free(value);
+ * ```
+ *
+ * @param table_view
+ * @param key
+ * @param value the value associated with the key
+ * @return true if there is an associated value of the key, otherwise false
+ */
+PULSAR_PUBLIC bool pulsar_table_view_retrieve_value(pulsar_table_view_t
*table_view, const char *key,
+ void **value, size_t
*value_size);
+
+/**
+ * It's similar with `pulsar_table_view_retrieve_value` except the associated
value not will be removed from
+ * the table view.
+ *
+ * NOTE:
+ * Once the value has been get successfully, `*value` will point to the memory
that is allocated internally.
+ * You have to call `free(value)` to free it.
+ *
+ * @param table_view
+ * @param key
+ * @param value the value associated with the key
+ * @return true if there is an associated value of the key, otherwise false
+ */
+PULSAR_PUBLIC bool pulsar_table_view_get_value(pulsar_table_view_t
*table_view, const char *key, void **value,
+ size_t *value_size);
+
+/**
+ * Check if the key exists in the table view.
+ * @param table_view
+ * @param key
+ * @return true if the key exists in the table view
+ */
+PULSAR_PUBLIC bool pulsar_table_view_contain_key(pulsar_table_view_t
*table_view, const char *key);
+
+/**
+ * Get the size of the elements.
+ * @param table_view
+ * @return
+ */
+PULSAR_PUBLIC int pulsar_table_view_size(pulsar_table_view_t *table_view);
+
+/**
+ * Performs the given action for each entry in this map until all entries have
been processed or the
+ * action throws an exception.
+ */
+PULSAR_PUBLIC void pulsar_table_view_for_each(pulsar_table_view_t *table_view,
+ pulsar_table_view_action action,
void *ctx);
+
+/**
+ * Performs the given action for each entry in this map until all entries have
been processed and
+ * register the callback, which will be called each time a key-value pair is
updated.
+ */
+PULSAR_PUBLIC void pulsar_table_view_for_each_add_listen(pulsar_table_view_t
*table_view,
+
pulsar_table_view_action action, void *ctx);
+
+/**
+ * Free the table view.
+ * @param table_view
+ */
+PULSAR_PUBLIC void pulsar_table_view_free(pulsar_table_view_t *table_view);
+
+/**
+ * Close the table view and stop the broker to push more messages
+ * @param table_view
+ * @return
+ */
+PULSAR_PUBLIC pulsar_result pulsar_table_view_close(pulsar_table_view_t
*table_view);
+
+/**
+ * Async close the table view and stop the broker to push more messages
+ * @param table_view
+ * @param callback
+ * @param ctx
+ */
+PULSAR_PUBLIC void pulsar_table_view_close_async(pulsar_table_view_t
*table_view,
+ pulsar_result_callback
callback, void *ctx);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/include/pulsar/c/table_view_configuration.h
b/include/pulsar/c/table_view_configuration.h
new file mode 100644
index 0000000..26389b6
--- /dev/null
+++ b/include/pulsar/c/table_view_configuration.h
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <pulsar/defines.h>
+
+#include "producer_configuration.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_table_view_configuration
pulsar_table_view_configuration_t;
+
+PULSAR_PUBLIC pulsar_table_view_configuration_t
*pulsar_table_view_configuration_create();
+
+PULSAR_PUBLIC void
pulsar_table_view_configuration_free(pulsar_table_view_configuration_t *conf);
+
+PULSAR_PUBLIC void pulsar_table_view_configuration_set_schema_info(
+ pulsar_table_view_configuration_t *table_view_configuration_t,
pulsar_schema_type schemaType,
+ const char *name, const char *schema, pulsar_string_map_t *properties);
+
+PULSAR_PUBLIC void pulsar_table_view_configuration_set_subscription_name(
+ pulsar_table_view_configuration_t *table_view_configuration_t, const char
*subscription_name);
+
+PULSAR_PUBLIC const char
*pulsar_table_view_configuration_get_subscription_name(
+ pulsar_table_view_configuration_t *table_view_configuration_t);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/lib/c/c_Client.cc b/lib/c/c_Client.cc
index dd1b71e..9f27538 100644
--- a/lib/c/c_Client.cc
+++ b/lib/c/c_Client.cc
@@ -192,6 +192,34 @@ void pulsar_client_create_reader_async(pulsar_client_t
*client, const char *topi
std::bind(&handle_reader_callback, std::placeholders::_1,
std::placeholders::_2, callback, ctx));
}
+pulsar_result pulsar_client_create_table_view(pulsar_client_t *client, const
char *topic,
+
pulsar_table_view_configuration_t *conf,
+ pulsar_table_view_t
**c_tableView) {
+ pulsar::TableView tableView;
+ pulsar::Result res = client->client->createTableView(topic,
conf->tableViewConfiguration, tableView);
+ if (res == pulsar::ResultOk) {
+ (*c_tableView) = new pulsar_table_view_t;
+ (*c_tableView)->tableView = std::move(tableView);
+ return pulsar_result_Ok;
+ }
+ return (pulsar_result)res;
+}
+
+void pulsar_client_create_table_view_async(pulsar_client_t *client, const char
*topic,
+ pulsar_table_view_configuration_t
*conf,
+ pulsar_table_view_callback
callback, void *ctx) {
+ client->client->createTableViewAsync(topic, conf->tableViewConfiguration,
+ [callback, ctx](pulsar::Result
result, pulsar::TableView tableView) {
+ if (result == pulsar::ResultOk) {
+ auto *c_tableView = new
pulsar_table_view_t;
+ c_tableView->tableView =
std::move(tableView);
+
callback((pulsar_result)result, c_tableView, ctx);
+ } else {
+
callback((pulsar_result)result, NULL, ctx);
+ }
+ });
+}
+
pulsar_result pulsar_client_get_topic_partitions(pulsar_client_t *client,
const char *topic,
pulsar_string_list_t
**partitions) {
std::vector<std::string> partitionsList;
diff --git a/lib/c/c_TableView.cc b/lib/c/c_TableView.cc
new file mode 100644
index 0000000..520b055
--- /dev/null
+++ b/lib/c/c_TableView.cc
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/table_view.h>
+#include <string.h>
+
+#include "c_structs.h"
+
+static void *malloc_and_copy(const char *s, size_t slen) {
+ void *result = (void *)malloc(slen);
+ if (result == NULL) {
+ abort();
+ }
+ memcpy(result, s, slen);
+ return result;
+}
+
+bool pulsar_table_view_retrieve_value(pulsar_table_view_t *table_view, const
char *key, void **value,
+ size_t *value_size) {
+ std::string v;
+ bool result = table_view->tableView.retrieveValue(key, v);
+ if (result) {
+ *value = malloc_and_copy(v.c_str(), v.size());
+ *value_size = v.size();
+ }
+ return result;
+}
+
+bool pulsar_table_view_get_value(pulsar_table_view_t *table_view, const char
*key, void **value,
+ size_t *value_size) {
+ std::string v;
+ bool result = table_view->tableView.getValue(key, v);
+ if (result) {
+ *value = malloc_and_copy(v.c_str(), v.size());
+ *value_size = v.size();
+ }
+ return result;
+}
+
+bool pulsar_table_view_contain_key(pulsar_table_view_t *table_view, const char
*key) {
+ return table_view->tableView.containsKey(key);
+}
+
+int pulsar_table_view_size(pulsar_table_view_t *table_view) { return
table_view->tableView.size(); }
+
+void pulsar_table_view_for_each(pulsar_table_view_t *table_view,
pulsar_table_view_action action, void *ctx) {
+ table_view->tableView.forEach([action, ctx](const std::string &key, const
std::string &value) {
+ if (action) {
+ action(key.c_str(), value.c_str(), value.size(), ctx);
+ }
+ });
+}
+
+void pulsar_table_view_for_each_add_listen(pulsar_table_view_t *table_view,
pulsar_table_view_action action,
+ void *ctx) {
+ table_view->tableView.forEachAndListen([action, ctx](const std::string
&key, const std::string &value) {
+ if (action) {
+ action(key.c_str(), value.c_str(), value.size(), ctx);
+ }
+ });
+}
+
+void pulsar_table_view_free(pulsar_table_view_t *table_view) { delete
table_view; }
+
+pulsar_result pulsar_table_view_close(pulsar_table_view_t *table_view) {
+ return (pulsar_result)table_view->tableView.close();
+}
+
+void pulsar_table_view_close_async(pulsar_table_view_t *table_view,
pulsar_result_callback callback,
+ void *ctx) {
+ table_view->tableView.closeAsync(
+ [callback, ctx](pulsar::Result result) { return
handle_result_callback(result, callback, ctx); });
+}
diff --git a/lib/c/c_TableViewConfiguration.cc
b/lib/c/c_TableViewConfiguration.cc
new file mode 100644
index 0000000..3f8132a
--- /dev/null
+++ b/lib/c/c_TableViewConfiguration.cc
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/c/table_view_configuration.h>
+
+#include "c_structs.h"
+
+pulsar_table_view_configuration_t *pulsar_table_view_configuration_create() {
+ auto *table_view_configuration_t = new pulsar_table_view_configuration_t;
+ table_view_configuration_t->tableViewConfiguration =
pulsar::TableViewConfiguration();
+ return table_view_configuration_t;
+}
+
+void pulsar_table_view_configuration_free(pulsar_table_view_configuration_t
*conf) { delete conf; }
+
+void pulsar_table_view_configuration_set_schema_info(
+ pulsar_table_view_configuration_t *table_view_configuration_t,
pulsar_schema_type schemaType,
+ const char *name, const char *schema, pulsar_string_map_t *properties) {
+ auto schemaInfo = pulsar::SchemaInfo((pulsar::SchemaType)schemaType, name,
schema, properties->map);
+ table_view_configuration_t->tableViewConfiguration.schemaInfo = schemaInfo;
+}
+
+void pulsar_table_view_configuration_set_subscription_name(
+ pulsar_table_view_configuration_t *table_view_configuration_t, const char
*subscription_name) {
+ table_view_configuration_t->tableViewConfiguration.subscriptionName =
subscription_name;
+}
+
+const char *pulsar_table_view_configuration_get_subscription_name(
+ pulsar_table_view_configuration_t *table_view_configuration_t) {
+ return
table_view_configuration_t->tableViewConfiguration.subscriptionName.c_str();
+}
diff --git a/lib/c/c_structs.h b/lib/c/c_structs.h
index 1e9c376..6f15998 100644
--- a/lib/c/c_structs.h
+++ b/lib/c/c_structs.h
@@ -48,6 +48,14 @@ struct _pulsar_consumer_configuration {
pulsar::ConsumerConfiguration consumerConfiguration;
};
+struct _pulsar_table_view {
+ pulsar::TableView tableView;
+};
+
+struct _pulsar_table_view_configuration {
+ pulsar::TableViewConfiguration tableViewConfiguration;
+};
+
struct _pulsar_reader {
pulsar::Reader reader;
};
diff --git a/tests/c/c_TableViewTest.cc b/tests/c/c_TableViewTest.cc
new file mode 100644
index 0000000..4b7c532
--- /dev/null
+++ b/tests/c/c_TableViewTest.cc
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/c/client.h>
+#include <pulsar/c/table_view.h>
+#include <string.h>
+
+#include <future>
+
+static const char *lookup_url = "pulsar://localhost:6650";
+
+struct tv_create_result {
+ pulsar_result result;
+ pulsar_table_view_t *tableView;
+};
+
+static void create_tv_callback(pulsar_result result, pulsar_table_view_t
*tableView, void *ctx) {
+ std::promise<tv_create_result> *create_promise =
(std::promise<tv_create_result> *)ctx;
+ create_promise->set_value({result, tableView});
+}
+
+TEST(c_TableViewTest, testCreateTableViewAsync) {
+ const char *topic_name =
"persistent://public/default/test-create-tv-async";
+ pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
+ pulsar_client_t *client = pulsar_client_create(lookup_url, conf);
+
+ // Create table view.
+ pulsar_table_view_configuration_t *table_view_conf =
pulsar_table_view_configuration_create();
+ pulsar_table_view_configuration_set_subscription_name(table_view_conf,
"sub-name");
+ std::promise<tv_create_result> create_promise;
+ std::future<tv_create_result> create_future = create_promise.get_future();
+ pulsar_client_create_table_view_async(client, topic_name, table_view_conf,
create_tv_callback,
+ &create_promise);
+ tv_create_result tvResult = create_future.get();
+ ASSERT_EQ(pulsar_result_Ok, tvResult.result);
+
+ pulsar_table_view_free(tvResult.tableView);
+ pulsar_client_close(client);
+}
+
+struct tv_action_ctx {
+ char *expect_data;
+ int size;
+ int expect_size;
+ std::promise<bool> *listen_promise;
+};
+
+static void tv_action(const char *key, const void *value, size_t value_size,
void *ctx) {
+ tv_action_ctx *context = (tv_action_ctx *)ctx;
+ context->size++;
+ ASSERT_EQ(memcmp(value, context->expect_data, value_size), 0);
+ if (context->size == context->expect_size) {
+ context->listen_promise->set_value(true);
+ }
+}
+
+TEST(c_TableViewTest, testSimpleTableView) {
+ srand(time(NULL));
+ char topic_name[64];
+ snprintf(topic_name, 64, "persistent://public/default/test-table-view-%d",
rand());
+ const char *sub_name = "my-sub-name";
+
+ pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
+ pulsar_client_t *client = pulsar_client_create(lookup_url, conf);
+
+ pulsar_producer_configuration_t *producer_conf =
pulsar_producer_configuration_create();
+ pulsar_producer_t *producer;
+ pulsar_result result = pulsar_client_create_producer(client, topic_name,
producer_conf, &producer);
+ ASSERT_EQ(pulsar_result_Ok, result);
+
+ // Send messages
+ const int num = 10;
+ const char *key1 = "key1";
+ const char *key2 = "key2";
+ size_t data_size = 4;
+ char *data = (char *)malloc(data_size);
+ data[0] = 0x01;
+ data[1] = 0x00;
+ data[2] = 0x02;
+ data[3] = 0x00;
+ for (int i = 0; i < num; i++) {
+ pulsar_message_t *message = pulsar_message_create();
+ if (i % 2 == 0) {
+ pulsar_message_set_partition_key(message, key1);
+ } else {
+ pulsar_message_set_partition_key(message, key2);
+ }
+ pulsar_message_set_content(message, data, data_size);
+ pulsar_result res = pulsar_producer_send(producer, message);
+ ASSERT_EQ(pulsar_result_Ok, res);
+ pulsar_message_free(message);
+ }
+
+ // Create table view.
+ pulsar_table_view_configuration_t *table_view_conf =
pulsar_table_view_configuration_create();
+ pulsar_table_view_configuration_set_subscription_name(table_view_conf,
sub_name);
+ pulsar_table_view_t *table_view;
+ result = pulsar_client_create_table_view(client, topic_name,
table_view_conf, &table_view);
+ ASSERT_EQ(pulsar_result_Ok, result);
+
+ // test get value
+ void *v1;
+ size_t v1_size;
+ ASSERT_EQ(pulsar_table_view_size(table_view), 2);
+ ASSERT_TRUE(pulsar_table_view_get_value(table_view, "key1", &v1,
&v1_size));
+ ASSERT_EQ(v1_size, data_size);
+ ASSERT_EQ(memcmp(v1, data, data_size), 0);
+ free(v1);
+
+ // test for each.
+ tv_action_ctx ctx;
+ ctx.expect_data = data;
+ ctx.size = 0;
+ pulsar_table_view_for_each(table_view, tv_action, &ctx);
+ ASSERT_EQ(ctx.size, 2);
+
+ // test for each and listen
+ std::promise<bool> listen_promise;
+ std::future<bool> listen_future = listen_promise.get_future();
+ tv_action_ctx ctx2;
+ ctx2.expect_data = data;
+ ctx2.size = 0;
+ ctx2.expect_size = 3;
+ ctx2.listen_promise = &listen_promise;
+ pulsar_table_view_for_each_add_listen(table_view, tv_action, &ctx2);
+ ASSERT_EQ(ctx.size, 2);
+ // send more message.
+ pulsar_message_t *message = pulsar_message_create();
+ pulsar_message_set_partition_key(message, "key3");
+ pulsar_message_set_content(message, data, data_size);
+ pulsar_result res = pulsar_producer_send(producer, message);
+ ASSERT_EQ(pulsar_result_Ok, res);
+ pulsar_message_free(message);
+ // wait for message.
+ ASSERT_TRUE(listen_future.get());
+ ASSERT_EQ(ctx2.size, ctx2.expect_size);
+
+ // test retrieve value
+ void *v2;
+ size_t v2_size;
+ ASSERT_TRUE(pulsar_table_view_retrieve_value(table_view, "key2", &v2,
&v2_size));
+ ASSERT_EQ(v2_size, data_size);
+ ASSERT_EQ(memcmp(v2, data, data_size), 0);
+ free(v2);
+
+ // test table view size
+ ASSERT_FALSE(pulsar_table_view_contain_key(table_view, "key2"));
+ ASSERT_EQ(pulsar_table_view_size(table_view), 2);
+
+ free(data);
+ pulsar_producer_close(producer);
+ pulsar_table_view_close(table_view);
+ pulsar_client_close(client);
+ pulsar_table_view_free(table_view);
+ pulsar_table_view_configuration_free(table_view_conf);
+ pulsar_producer_free(producer);
+ pulsar_producer_configuration_free(producer_conf);
+ pulsar_client_free(client);
+ pulsar_client_configuration_free(conf);
+}