This is an automated email from the ASF dual-hosted git repository.

nkurihar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new fa7a6d1  Support seting topic schema when creating producers/consumers 
(#223)
fa7a6d1 is described below

commit fa7a6d1f0e2859522418203ec841a12e497377ba
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Tue Jul 5 15:58:03 2022 +0900

    Support seting topic schema when creating producers/consumers (#223)
    
    * Set topic schema support
    
    Added class to allow setting topic schema when creating
    producers/consumers
    
    * forgotten code cleanup
    
    * Fix compilation errors
    
    Co-authored-by: Liam Condon <[email protected]>
---
 binding.gyp           |  1 +
 src/ConsumerConfig.cc |  8 ++++++
 src/ProducerConfig.cc |  9 +++++-
 src/SchemaInfo.cc     | 78 +++++++++++++++++++++++++++++++++++++++++++++++++++
 src/SchemaInfo.h      | 41 +++++++++++++++++++++++++++
 5 files changed, 136 insertions(+), 1 deletion(-)

diff --git a/binding.gyp b/binding.gyp
index 66f3d0a..0bc1eaf 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -47,6 +47,7 @@
         "src/MessageId.cc",
         "src/Authentication.cc",
         "src/Client.cc",
+        "src/SchemaInfo.cc",
         "src/Producer.cc",
         "src/ProducerConfig.cc",
         "src/Consumer.cc",
diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index 3f9ce13..dd48459 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -19,6 +19,7 @@
 
 #include "ConsumerConfig.h"
 #include "Consumer.h"
+#include "SchemaInfo.h"
 #include "Message.h"
 #include <pulsar/c/consumer_configuration.h>
 #include <pulsar/c/consumer.h>
@@ -38,6 +39,7 @@ static const std::string CFG_CONSUMER_NAME = "consumerName";
 static const std::string CFG_PROPS = "properties";
 static const std::string CFG_LISTENER = "listener";
 static const std::string CFG_READ_COMPACTED = "readCompacted";
+static const std::string CFG_SCHEMA = "schema";
 static const std::string CFG_PRIVATE_KEY_PATH = "privateKeyPath";
 static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction";
 
@@ -144,6 +146,12 @@ ConsumerConfig::ConsumerConfig(const Napi::Object 
&consumerConfig, pulsar_messag
     }
   }
 
+  if (consumerConfig.Has(CFG_SCHEMA) && 
consumerConfig.Get(CFG_SCHEMA).IsObject()) {
+    SchemaInfo *schemaInfo = new 
SchemaInfo(consumerConfig.Get(CFG_SCHEMA).ToObject());
+    schemaInfo->SetConsumerSchema(this->cConsumerConfig);
+    delete schemaInfo;
+  }
+
   if (consumerConfig.Has(CFG_PROPS) && 
consumerConfig.Get(CFG_PROPS).IsObject()) {
     Napi::Object propObj = consumerConfig.Get(CFG_PROPS).ToObject();
     Napi::Array arr = propObj.GetPropertyNames();
diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc
index 6e42a53..289c55f 100644
--- a/src/ProducerConfig.cc
+++ b/src/ProducerConfig.cc
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
+#include "SchemaInfo.h"
 #include "ProducerConfig.h"
 #include <map>
 
@@ -33,6 +33,7 @@ static const std::string CFG_COMPRESS_TYPE = 
"compressionType";
 static const std::string CFG_BATCH_ENABLED = "batchingEnabled";
 static const std::string CFG_BATCH_MAX_DELAY = "batchingMaxPublishDelayMs";
 static const std::string CFG_BATCH_MAX_MSG = "batchingMaxMessages";
+static const std::string CFG_SCHEMA = "schema";
 static const std::string CFG_PROPS = "properties";
 static const std::string CFG_PUBLIC_KEY_PATH = "publicKeyPath";
 static const std::string CFG_ENCRYPTION_KEY = "encryptionKey";
@@ -152,6 +153,12 @@ ProducerConfig::ProducerConfig(const Napi::Object& 
producerConfig) : topic("") {
     }
   }
 
+  if (producerConfig.Has(CFG_SCHEMA) && 
producerConfig.Get(CFG_SCHEMA).IsObject()) {
+    SchemaInfo* schemaInfo = new 
SchemaInfo(producerConfig.Get(CFG_SCHEMA).ToObject());
+    schemaInfo->SetProducerSchema(this->cProducerConfig);
+    delete schemaInfo;
+  }
+
   if (producerConfig.Has(CFG_PROPS) && 
producerConfig.Get(CFG_PROPS).IsObject()) {
     Napi::Object propObj = producerConfig.Get(CFG_PROPS).ToObject();
     Napi::Array arr = propObj.GetPropertyNames();
diff --git a/src/SchemaInfo.cc b/src/SchemaInfo.cc
new file mode 100644
index 0000000..b6943f2
--- /dev/null
+++ b/src/SchemaInfo.cc
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "SchemaInfo.h"
+#include <map>
+
+static const std::string CFG_SCHEMA_TYPE = "schemaType";
+static const std::string CFG_NAME = "name";
+static const std::string CFG_SCHEMA = "schema";
+static const std::string CFG_PROPS = "properties";
+
+static const std::map<std::string, pulsar_schema_type> SCHEMA_TYPE = {{"None", 
pulsar_None},
+                                                                      
{"String", pulsar_String},
+                                                                      {"Json", 
pulsar_Json},
+                                                                      
{"Protobuf", pulsar_Protobuf},
+                                                                      {"Avro", 
pulsar_Avro},
+                                                                      
{"Boolean", pulsar_Boolean},
+                                                                      {"Int8", 
pulsar_Int8},
+                                                                      
{"Int16", pulsar_Int16},
+                                                                      
{"Int32", pulsar_Int32},
+                                                                      
{"Int64", pulsar_Int64},
+                                                                      
{"Float32", pulsar_Float32},
+                                                                      
{"Float64", pulsar_Float64},
+                                                                      
{"KeyValue", pulsar_KeyValue},
+                                                                      
{"Bytes", pulsar_Bytes},
+                                                                      
{"AutoConsume", pulsar_AutoConsume},
+                                                                      
{"AutoPublish", pulsar_AutoPublish}};
+
+SchemaInfo::SchemaInfo(const Napi::Object &schemaInfo) : 
cSchemaType(pulsar_Bytes), name("BYTES"), schema() {
+  this->cProperties = pulsar_string_map_create();
+  if (schemaInfo.Has(CFG_SCHEMA_TYPE) && 
schemaInfo.Get(CFG_SCHEMA_TYPE).IsString()) {
+    this->name = schemaInfo.Get(CFG_SCHEMA_TYPE).ToString().Utf8Value();
+    this->cSchemaType = 
SCHEMA_TYPE.at(schemaInfo.Get(CFG_SCHEMA_TYPE).ToString().Utf8Value());
+  }
+  if (schemaInfo.Has(CFG_NAME) && schemaInfo.Get(CFG_NAME).IsString()) {
+    this->name = schemaInfo.Get(CFG_NAME).ToString().Utf8Value();
+  }
+  if (schemaInfo.Has(CFG_SCHEMA) && schemaInfo.Get(CFG_SCHEMA).IsString()) {
+    this->schema = schemaInfo.Get(CFG_SCHEMA).ToString().Utf8Value();
+  }
+  if (schemaInfo.Has(CFG_PROPS) && schemaInfo.Get(CFG_PROPS).IsObject()) {
+    Napi::Object propObj = schemaInfo.Get(CFG_PROPS).ToObject();
+    Napi::Array arr = propObj.GetPropertyNames();
+    int size = arr.Length();
+    for (int i = 0; i < size; i++) {
+      Napi::String key = arr.Get(i).ToString();
+      Napi::String value = propObj.Get(key).ToString();
+      pulsar_string_map_put(this->cProperties, key.Utf8Value().c_str(), 
value.Utf8Value().c_str());
+    }
+  }
+}
+
+void 
SchemaInfo::SetProducerSchema(std::shared_ptr<pulsar_producer_configuration_t> 
cProducerConfiguration) {
+  pulsar_producer_configuration_set_schema_info(cProducerConfiguration.get(), 
this->cSchemaType,
+                                                this->name.c_str(), 
this->schema.c_str(), this->cProperties);
+}
+
+void 
SchemaInfo::SetConsumerSchema(std::shared_ptr<pulsar_consumer_configuration_t> 
cConsumerConfiguration) {
+  pulsar_consumer_configuration_set_schema_info(cConsumerConfiguration.get(), 
this->cSchemaType,
+                                                this->name.c_str(), 
this->schema.c_str(), this->cProperties);
+}
+
+SchemaInfo::~SchemaInfo() { pulsar_string_map_free(this->cProperties); }
diff --git a/src/SchemaInfo.h b/src/SchemaInfo.h
new file mode 100644
index 0000000..90674f5
--- /dev/null
+++ b/src/SchemaInfo.h
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef SCHEMA_INFO_H
+#define SCHEMA_INFO_H
+
+#include <napi.h>
+#include <pulsar/c/producer_configuration.h>
+#include <pulsar/c/consumer_configuration.h>
+
+class SchemaInfo {
+ public:
+  SchemaInfo(const Napi::Object &schemaInfo);
+  ~SchemaInfo();
+  void SetProducerSchema(std::shared_ptr<pulsar_producer_configuration_t> 
cProducerConfiguration);
+  void SetConsumerSchema(std::shared_ptr<pulsar_consumer_configuration_t> 
cConsumerConfiguration);
+
+ private:
+  pulsar_schema_type cSchemaType;
+  std::string name;
+  std::string schema;
+  pulsar_string_map_t *cProperties;
+};
+
+#endif

Reply via email to