This is an automated email from the ASF dual-hosted git repository.
nkurihar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new fa7a6d1 Support seting topic schema when creating producers/consumers
(#223)
fa7a6d1 is described below
commit fa7a6d1f0e2859522418203ec841a12e497377ba
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Tue Jul 5 15:58:03 2022 +0900
Support seting topic schema when creating producers/consumers (#223)
* Set topic schema support
Added class to allow setting topic schema when creating
producers/consumers
* forgotten code cleanup
* Fix compilation errors
Co-authored-by: Liam Condon <[email protected]>
---
binding.gyp | 1 +
src/ConsumerConfig.cc | 8 ++++++
src/ProducerConfig.cc | 9 +++++-
src/SchemaInfo.cc | 78 +++++++++++++++++++++++++++++++++++++++++++++++++++
src/SchemaInfo.h | 41 +++++++++++++++++++++++++++
5 files changed, 136 insertions(+), 1 deletion(-)
diff --git a/binding.gyp b/binding.gyp
index 66f3d0a..0bc1eaf 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -47,6 +47,7 @@
"src/MessageId.cc",
"src/Authentication.cc",
"src/Client.cc",
+ "src/SchemaInfo.cc",
"src/Producer.cc",
"src/ProducerConfig.cc",
"src/Consumer.cc",
diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index 3f9ce13..dd48459 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -19,6 +19,7 @@
#include "ConsumerConfig.h"
#include "Consumer.h"
+#include "SchemaInfo.h"
#include "Message.h"
#include <pulsar/c/consumer_configuration.h>
#include <pulsar/c/consumer.h>
@@ -38,6 +39,7 @@ static const std::string CFG_CONSUMER_NAME = "consumerName";
static const std::string CFG_PROPS = "properties";
static const std::string CFG_LISTENER = "listener";
static const std::string CFG_READ_COMPACTED = "readCompacted";
+static const std::string CFG_SCHEMA = "schema";
static const std::string CFG_PRIVATE_KEY_PATH = "privateKeyPath";
static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction";
@@ -144,6 +146,12 @@ ConsumerConfig::ConsumerConfig(const Napi::Object
&consumerConfig, pulsar_messag
}
}
+ if (consumerConfig.Has(CFG_SCHEMA) &&
consumerConfig.Get(CFG_SCHEMA).IsObject()) {
+ SchemaInfo *schemaInfo = new
SchemaInfo(consumerConfig.Get(CFG_SCHEMA).ToObject());
+ schemaInfo->SetConsumerSchema(this->cConsumerConfig);
+ delete schemaInfo;
+ }
+
if (consumerConfig.Has(CFG_PROPS) &&
consumerConfig.Get(CFG_PROPS).IsObject()) {
Napi::Object propObj = consumerConfig.Get(CFG_PROPS).ToObject();
Napi::Array arr = propObj.GetPropertyNames();
diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc
index 6e42a53..289c55f 100644
--- a/src/ProducerConfig.cc
+++ b/src/ProducerConfig.cc
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-
+#include "SchemaInfo.h"
#include "ProducerConfig.h"
#include <map>
@@ -33,6 +33,7 @@ static const std::string CFG_COMPRESS_TYPE =
"compressionType";
static const std::string CFG_BATCH_ENABLED = "batchingEnabled";
static const std::string CFG_BATCH_MAX_DELAY = "batchingMaxPublishDelayMs";
static const std::string CFG_BATCH_MAX_MSG = "batchingMaxMessages";
+static const std::string CFG_SCHEMA = "schema";
static const std::string CFG_PROPS = "properties";
static const std::string CFG_PUBLIC_KEY_PATH = "publicKeyPath";
static const std::string CFG_ENCRYPTION_KEY = "encryptionKey";
@@ -152,6 +153,12 @@ ProducerConfig::ProducerConfig(const Napi::Object&
producerConfig) : topic("") {
}
}
+ if (producerConfig.Has(CFG_SCHEMA) &&
producerConfig.Get(CFG_SCHEMA).IsObject()) {
+ SchemaInfo* schemaInfo = new
SchemaInfo(producerConfig.Get(CFG_SCHEMA).ToObject());
+ schemaInfo->SetProducerSchema(this->cProducerConfig);
+ delete schemaInfo;
+ }
+
if (producerConfig.Has(CFG_PROPS) &&
producerConfig.Get(CFG_PROPS).IsObject()) {
Napi::Object propObj = producerConfig.Get(CFG_PROPS).ToObject();
Napi::Array arr = propObj.GetPropertyNames();
diff --git a/src/SchemaInfo.cc b/src/SchemaInfo.cc
new file mode 100644
index 0000000..b6943f2
--- /dev/null
+++ b/src/SchemaInfo.cc
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "SchemaInfo.h"
+#include <map>
+
+static const std::string CFG_SCHEMA_TYPE = "schemaType";
+static const std::string CFG_NAME = "name";
+static const std::string CFG_SCHEMA = "schema";
+static const std::string CFG_PROPS = "properties";
+
+static const std::map<std::string, pulsar_schema_type> SCHEMA_TYPE = {{"None",
pulsar_None},
+
{"String", pulsar_String},
+ {"Json",
pulsar_Json},
+
{"Protobuf", pulsar_Protobuf},
+ {"Avro",
pulsar_Avro},
+
{"Boolean", pulsar_Boolean},
+ {"Int8",
pulsar_Int8},
+
{"Int16", pulsar_Int16},
+
{"Int32", pulsar_Int32},
+
{"Int64", pulsar_Int64},
+
{"Float32", pulsar_Float32},
+
{"Float64", pulsar_Float64},
+
{"KeyValue", pulsar_KeyValue},
+
{"Bytes", pulsar_Bytes},
+
{"AutoConsume", pulsar_AutoConsume},
+
{"AutoPublish", pulsar_AutoPublish}};
+
+SchemaInfo::SchemaInfo(const Napi::Object &schemaInfo) :
cSchemaType(pulsar_Bytes), name("BYTES"), schema() {
+ this->cProperties = pulsar_string_map_create();
+ if (schemaInfo.Has(CFG_SCHEMA_TYPE) &&
schemaInfo.Get(CFG_SCHEMA_TYPE).IsString()) {
+ this->name = schemaInfo.Get(CFG_SCHEMA_TYPE).ToString().Utf8Value();
+ this->cSchemaType =
SCHEMA_TYPE.at(schemaInfo.Get(CFG_SCHEMA_TYPE).ToString().Utf8Value());
+ }
+ if (schemaInfo.Has(CFG_NAME) && schemaInfo.Get(CFG_NAME).IsString()) {
+ this->name = schemaInfo.Get(CFG_NAME).ToString().Utf8Value();
+ }
+ if (schemaInfo.Has(CFG_SCHEMA) && schemaInfo.Get(CFG_SCHEMA).IsString()) {
+ this->schema = schemaInfo.Get(CFG_SCHEMA).ToString().Utf8Value();
+ }
+ if (schemaInfo.Has(CFG_PROPS) && schemaInfo.Get(CFG_PROPS).IsObject()) {
+ Napi::Object propObj = schemaInfo.Get(CFG_PROPS).ToObject();
+ Napi::Array arr = propObj.GetPropertyNames();
+ int size = arr.Length();
+ for (int i = 0; i < size; i++) {
+ Napi::String key = arr.Get(i).ToString();
+ Napi::String value = propObj.Get(key).ToString();
+ pulsar_string_map_put(this->cProperties, key.Utf8Value().c_str(),
value.Utf8Value().c_str());
+ }
+ }
+}
+
+void
SchemaInfo::SetProducerSchema(std::shared_ptr<pulsar_producer_configuration_t>
cProducerConfiguration) {
+ pulsar_producer_configuration_set_schema_info(cProducerConfiguration.get(),
this->cSchemaType,
+ this->name.c_str(),
this->schema.c_str(), this->cProperties);
+}
+
+void
SchemaInfo::SetConsumerSchema(std::shared_ptr<pulsar_consumer_configuration_t>
cConsumerConfiguration) {
+ pulsar_consumer_configuration_set_schema_info(cConsumerConfiguration.get(),
this->cSchemaType,
+ this->name.c_str(),
this->schema.c_str(), this->cProperties);
+}
+
+SchemaInfo::~SchemaInfo() { pulsar_string_map_free(this->cProperties); }
diff --git a/src/SchemaInfo.h b/src/SchemaInfo.h
new file mode 100644
index 0000000..90674f5
--- /dev/null
+++ b/src/SchemaInfo.h
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef SCHEMA_INFO_H
+#define SCHEMA_INFO_H
+
+#include <napi.h>
+#include <pulsar/c/producer_configuration.h>
+#include <pulsar/c/consumer_configuration.h>
+
+class SchemaInfo {
+ public:
+ SchemaInfo(const Napi::Object &schemaInfo);
+ ~SchemaInfo();
+ void SetProducerSchema(std::shared_ptr<pulsar_producer_configuration_t>
cProducerConfiguration);
+ void SetConsumerSchema(std::shared_ptr<pulsar_consumer_configuration_t>
cConsumerConfiguration);
+
+ private:
+ pulsar_schema_type cSchemaType;
+ std::string name;
+ std::string schema;
+ pulsar_string_map_t *cProperties;
+};
+
+#endif