This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d5036ea  [go schema] support go schema for pulsar-client-go (#3904)
d5036ea is described below

commit d5036eaefcba42e6c9d159ffb14bee597d13380f
Author: 冉小龙 <[email protected]>
AuthorDate: Tue Apr 23 12:59:33 2019 +0800

    [go schema] support go schema for pulsar-client-go (#3904)
    
    
    Signed-off-by: xiaolong.ran [email protected]
    
    Master Issue: #3855
    
    Motivation
    support go schema for pulsar-client-go
---
 pulsar-client-cpp/include/pulsar/Schema.h          |  52 +--
 .../include/pulsar/c/consumer_configuration.h      |   5 +
 .../include/pulsar/c/producer_configuration.h      |  23 +
 pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc |   8 +
 pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc |   7 +
 pulsar-client-go/go.mod                            |   7 +
 pulsar-client-go/go.sum                            |  15 +
 pulsar-client-go/pulsar/c_client.go                |  74 ++-
 pulsar-client-go/pulsar/c_consumer.go              |  60 ++-
 pulsar-client-go/pulsar/c_message.go               |  11 +-
 pulsar-client-go/pulsar/c_producer.go              |  77 +++-
 pulsar-client-go/pulsar/c_reader.go                |  13 +-
 pulsar-client-go/pulsar/client.go                  |   8 +-
 pulsar-client-go/pulsar/consumer.go                |   4 +
 pulsar-client-go/pulsar/consumer_test.go           |   7 +-
 pulsar-client-go/pulsar/message.go                 |   6 +
 pulsar-client-go/pulsar/pb/build.sh                |  23 +
 pulsar-client-go/pulsar/pb/hello.pb.go             | 100 ++++
 pulsar-client-go/pulsar/pb/hello.proto             |  25 +
 pulsar-client-go/pulsar/primitiveSerDe.go          | 318 +++++++++++++
 pulsar-client-go/pulsar/primitiveSerDe_test.go     | 145 ++++++
 pulsar-client-go/pulsar/producer.go                |   2 +
 pulsar-client-go/pulsar/producer_test.go           |   3 +-
 pulsar-client-go/pulsar/reader.go                  |   2 +
 pulsar-client-go/pulsar/reader_test.go             |   2 +-
 pulsar-client-go/pulsar/schema.go                  | 504 +++++++++++++++++++++
 pulsar-client-go/pulsar/schemaDef_test.go          |  58 +++
 pulsar-client-go/pulsar/schema_test.go             | 436 ++++++++++++++++++
 .../pulsar/{util_test.go => testhelps.go}          |   0
 29 files changed, 1935 insertions(+), 60 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/Schema.h 
b/pulsar-client-cpp/include/pulsar/Schema.h
index 257ee3a..fe09fb9 100644
--- a/pulsar-client-cpp/include/pulsar/Schema.h
+++ b/pulsar-client-cpp/include/pulsar/Schema.h
@@ -40,69 +40,69 @@ enum SchemaType
     STRING = 1,
 
     /**
-     * A 8-byte integer.
+     * JSON object encoding and validation
      */
-    INT8 = 2,
+    JSON = 2,
 
     /**
-     * A 16-byte integer.
+     * Protobuf message encoding and decoding
      */
-    INT16 = 3,
+    PROTOBUF = 3,
 
     /**
-     * A 32-byte integer.
+     * Serialize and deserialize via Avro
      */
-    INT32 = 4,
+    AVRO = 4,
 
     /**
-     * A 64-byte integer.
+     * A 8-byte integer.
      */
-    INT64 = 5,
+    INT8 = 6,
 
     /**
-     * A float number.
+     * A 16-byte integer.
      */
-    FLOAT = 6,
+    INT16 = 7,
 
     /**
-     * A double number
+     * A 32-byte integer.
      */
-    DOUBLE = 7,
+    INT32 = 8,
 
     /**
-     * A bytes array.
+     * A 64-byte integer.
      */
-    BYTES = 8,
+    INT64 = 9,
 
     /**
-     * JSON object encoding and validation
+     * A float number.
      */
-    JSON = 9,
+    FLOAT = 10,
 
     /**
-     * Protobuf message encoding and decoding
+     * A double number
      */
-    PROTOBUF = 10,
+    DOUBLE = 11,
 
     /**
-     * Serialize and deserialize via Avro
+     * A Schema that contains Key Schema and Value Schema.
      */
-    AVRO = 11,
+    KEY_VALUE = 15,
 
     /**
-     * Auto Consume Type.
+     * A bytes array.
      */
-    AUTO_CONSUME = 13,
+    BYTES = -1,
 
     /**
-     * Auto Publish Type.
+     * Auto Consume Type.
      */
-    AUTO_PUBLISH = 14,
+    AUTO_CONSUME = -3,
 
     /**
-     * A Schema that contains Key Schema and Value Schema.
+     * Auto Publish Type.
      */
-    KEY_VALUE = 15,
+    AUTO_PUBLISH = -4,
 };
 
 // Return string representation of result code
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h 
b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
index 810d062..2a04d1e 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
@@ -19,6 +19,7 @@
 #pragma once
 
 #include "consumer.h"
+#include "producer_configuration.h"
 
 #ifdef __cplusplus
 extern "C" {
@@ -82,6 +83,10 @@ void 
pulsar_consumer_configuration_set_consumer_type(pulsar_consumer_configurati
 pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type(
     pulsar_consumer_configuration_t *consumer_configuration);
 
+void 
pulsar_consumer_configuration_set_schema_info(pulsar_consumer_configuration_t 
*consumer_configuration,
+                                                   pulsar_schema_type 
schemaType, const char *name,
+                                                   const char *schema, 
pulsar_string_map_t *properties);
+
 /**
  * A message listener enables your application to configure how to process
  * and acknowledge messages delivered. A listener will be called in order
diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h 
b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
index 670bf50..84b324b 100644
--- a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
@@ -43,6 +43,25 @@ typedef enum {
     pulsar_CompressionZLib = 2
 } pulsar_compression_type;
 
+typedef enum {
+    pulsar_None = 0,
+    pulsar_String = 1,
+    pulsar_Json = 2,
+    pulsar_Protobuf = 3,
+    pulsar_Avro = 4,
+    pulsar_Boolean = 5,
+    pulsar_Int8 = 6,
+    pulsar_Int16 = 7,
+    pulsar_Int32 = 8,
+    pulsar_Int64 = 9,
+    pulsar_Float32 = 10,
+    pulsar_Float64 = 11,
+    pulsar_KeyValue = 15,
+    pulsar_Bytes = -1,
+    pulsar_AutoConsume = -3,
+    pulsar_AutoPublish = -4,
+} pulsar_schema_type;
+
 typedef struct _pulsar_producer_configuration pulsar_producer_configuration_t;
 
 pulsar_producer_configuration_t *pulsar_producer_configuration_create();
@@ -69,6 +88,10 @@ void 
pulsar_producer_configuration_set_compression_type(pulsar_producer_configur
 pulsar_compression_type pulsar_producer_configuration_get_compression_type(
     pulsar_producer_configuration_t *conf);
 
+void 
pulsar_producer_configuration_set_schema_info(pulsar_producer_configuration_t 
*conf,
+                                                   pulsar_schema_type 
schemaType, const char *name,
+                                                   const char *schema, 
pulsar_string_map_t *properties);
+
 void 
pulsar_producer_configuration_set_max_pending_messages(pulsar_producer_configuration_t
 *conf,
                                                             int 
maxPendingMessages);
 
diff --git a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc 
b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
index 49cf2be..9b23729 100644
--- a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
@@ -40,6 +40,13 @@ pulsar_consumer_type 
pulsar_consumer_configuration_get_consumer_type(
     return 
(pulsar_consumer_type)consumer_configuration->consumerConfiguration.getConsumerType();
 }
 
+void 
pulsar_consumer_configuration_set_schema_info(pulsar_consumer_configuration_t 
*consumer_configuration,
+                                                   pulsar_schema_type 
schemaType, const char *name,
+                                                   const char *schema, 
pulsar_string_map_t *properties) {
+    auto schemaInfo = pulsar::SchemaInfo((pulsar::SchemaType)schemaType, name, 
schema, properties->map);
+    consumer_configuration->consumerConfiguration.setSchema(schemaInfo);
+}
+
 static void message_listener_callback(pulsar::Consumer consumer, const 
pulsar::Message &msg,
                                       pulsar_message_listener listener, void 
*ctx) {
     pulsar_consumer_t c_consumer;
@@ -105,6 +112,7 @@ void pulsar_configure_set_negative_ack_redelivery_delay_ms(
     pulsar_consumer_configuration_t *consumer_configuration, long 
redeliveryDelayMillis) {
     
consumer_configuration->consumerConfiguration.setNegativeAckRedeliveryDelayMs(redeliveryDelayMillis);
 }
+
 long pulsar_configure_get_negative_ack_redelivery_delay_ms(
     pulsar_consumer_configuration_t *consumer_configuration) {
     return 
consumer_configuration->consumerConfiguration.getNegativeAckRedeliveryDelayMs();
diff --git a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc 
b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
index a4a24c1..6e2b7fc 100644
--- a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
@@ -66,6 +66,13 @@ pulsar_compression_type 
pulsar_producer_configuration_get_compression_type(
     return (pulsar_compression_type)conf->conf.getCompressionType();
 }
 
+void 
pulsar_producer_configuration_set_schema_info(pulsar_producer_configuration_t 
*conf,
+                                                   pulsar_schema_type 
schemaType, const char *name,
+                                                   const char *schema, 
pulsar_string_map_t *properties) {
+    auto schemaInfo = pulsar::SchemaInfo((pulsar::SchemaType)schemaType, name, 
schema, properties->map);
+    conf->conf.setSchema(schemaInfo);
+}
+
 void 
pulsar_producer_configuration_set_max_pending_messages(pulsar_producer_configuration_t
 *conf,
                                                             int 
maxPendingMessages) {
     conf->conf.setMaxPendingMessages(maxPendingMessages);
diff --git a/pulsar-client-go/go.mod b/pulsar-client-go/go.mod
index 1d826eb..ea75273 100644
--- a/pulsar-client-go/go.mod
+++ b/pulsar-client-go/go.mod
@@ -2,8 +2,15 @@ module github.com/apache/pulsar/pulsar-client-go
 
 require (
        github.com/BurntSushi/toml v0.3.1 // indirect
+       github.com/alecthomas/jsonschema v0.0.0-20190122210438-a6952de1bbe6
+       github.com/davecgh/go-spew v1.1.1
+       github.com/gogo/protobuf v1.2.1
+       github.com/golang/protobuf v1.3.1
+       github.com/golang/snappy v0.0.1 // indirect
+       github.com/linkedin/goavro v2.1.0+incompatible
        github.com/sirupsen/logrus v1.3.0
        github.com/stretchr/testify v1.3.0
+       gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect
        gopkg.in/natefinch/lumberjack.v2 v2.0.0
        gopkg.in/yaml.v2 v2.2.2 // indirect
 )
diff --git a/pulsar-client-go/go.sum b/pulsar-client-go/go.sum
index 5f0cd6b..b6177d8 100644
--- a/pulsar-client-go/go.sum
+++ b/pulsar-client-go/go.sum
@@ -1,10 +1,22 @@
 github.com/BurntSushi/toml v0.3.1 
h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
 github.com/BurntSushi/toml v0.3.1/go.mod 
h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/alecthomas/jsonschema v0.0.0-20190122210438-a6952de1bbe6 
h1:xadBCbc8D9mmkaNfCsEBHbIoCjbayJXJNsY1JjPjNio=
+github.com/alecthomas/jsonschema v0.0.0-20190122210438-a6952de1bbe6/go.mod 
h1:qpebaTNSsyUn5rPSJMsfqEtDw71TTggXM6stUDI16HA=
 github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
+github.com/gogo/protobuf v1.2.1/go.mod 
h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
+github.com/golang/protobuf v1.3.1 
h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
+github.com/golang/protobuf v1.3.1/go.mod 
h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
+github.com/golang/snappy v0.0.1/go.mod 
h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/kisielk/errcheck v1.1.0/go.mod 
h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
+github.com/kisielk/gotool v1.0.0/go.mod 
h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1 
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod 
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/linkedin/goavro v2.1.0+incompatible 
h1:DV2aUlj2xZiuxQyvag8Dy7zjY69ENjS66bWkSfdpddY=
+github.com/linkedin/goavro v2.1.0+incompatible/go.mod 
h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM=
 github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/sirupsen/logrus v1.3.0 
h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
@@ -18,8 +30,11 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 
h1:u+LnwYTOOW7Ukr/fppxEb1
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod 
h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 
h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod 
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 
h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/linkedin/goavro.v1 v1.0.5 
h1:BJa69CDh0awSsLUmZ9+BowBdokpduDZSM9Zk8oKHfN4=
+gopkg.in/linkedin/goavro.v1 v1.0.5/go.mod 
h1:Aw5GdAbizjOEl0kAMHV9iHmA8reZzW/OKuJAl4Hb9F0=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0 
h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod 
h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
 gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
diff --git a/pulsar-client-go/pulsar/c_client.go 
b/pulsar-client-go/pulsar/c_client.go
index 3ade48c..33f523d 100644
--- a/pulsar-client-go/pulsar/c_client.go
+++ b/pulsar-client-go/pulsar/c_client.go
@@ -196,7 +196,7 @@ func (client *client) CreateProducer(options 
ProducerOptions) (Producer, error)
                error
        })
 
-       client.CreateProducerAsync(options, func(producer Producer, err error) {
+       client.CreateProducerAsync(options, nil, func(producer Producer, err 
error) {
                c <- struct {
                        Producer
                        error
@@ -208,8 +208,28 @@ func (client *client) CreateProducer(options 
ProducerOptions) (Producer, error)
        return res.Producer, res.error
 }
 
-func (client *client) CreateProducerAsync(options ProducerOptions, callback 
func(producer Producer, err error)) {
-       createProducerAsync(client, options, callback)
+func (client *client) CreateProducerWithSchema(options ProducerOptions, schema 
Schema) (Producer, error) {
+       // Create is implemented on async create with a channel to wait for
+       // completion without blocking the real thread
+       c := make(chan struct {
+               Producer
+               error
+       })
+
+       client.CreateProducerAsync(options, schema, func(producer Producer, err 
error) {
+               c <- struct {
+                       Producer
+                       error
+               }{producer, err}
+               close(c)
+       })
+
+       res := <-c
+       return res.Producer, res.error
+}
+
+func (client *client) CreateProducerAsync(options ProducerOptions, schema 
Schema, callback func(producer Producer, err error)) {
+       createProducerAsync(client, schema, options, callback)
 }
 
 func (client *client) Subscribe(options ConsumerOptions) (Consumer, error) {
@@ -218,7 +238,25 @@ func (client *client) Subscribe(options ConsumerOptions) 
(Consumer, error) {
                error
        })
 
-       client.SubscribeAsync(options, func(consumer Consumer, err error) {
+       client.SubscribeAsync(options, nil, func(consumer Consumer, err error) {
+               c <- struct {
+                       Consumer
+                       error
+               }{consumer, err}
+               close(c)
+       })
+
+       res := <-c
+       return res.Consumer, res.error
+}
+
+func (client *client) SubscribeWithSchema(options ConsumerOptions, schema 
Schema) (Consumer, error) {
+       c := make(chan struct {
+               Consumer
+               error
+       })
+
+       client.SubscribeAsync(options, schema, func(consumer Consumer, err 
error) {
                c <- struct {
                        Consumer
                        error
@@ -230,8 +268,8 @@ func (client *client) Subscribe(options ConsumerOptions) 
(Consumer, error) {
        return res.Consumer, res.error
 }
 
-func (client *client) SubscribeAsync(options ConsumerOptions, callback 
func(Consumer, error)) {
-       subscribeAsync(client, options, callback)
+func (client *client) SubscribeAsync(options ConsumerOptions, schema Schema, 
callback func(Consumer, error)) {
+       subscribeAsync(client, options, schema, callback)
 }
 
 func (client *client) CreateReader(options ReaderOptions) (Reader, error) {
@@ -240,7 +278,25 @@ func (client *client) CreateReader(options ReaderOptions) 
(Reader, error) {
                error
        })
 
-       client.CreateReaderAsync(options, func(reader Reader, err error) {
+       client.CreateReaderAsync(options, nil, func(reader Reader, err error) {
+               c <- struct {
+                       Reader
+                       error
+               }{reader, err}
+               close(c)
+       })
+
+       res := <-c
+       return res.Reader, res.error
+}
+
+func (client *client) CreateReaderWithSchema(options ReaderOptions, schema 
Schema) (Reader, error) {
+       c := make(chan struct {
+               Reader
+               error
+       })
+
+       client.CreateReaderAsync(options, schema, func(reader Reader, err 
error) {
                c <- struct {
                        Reader
                        error
@@ -271,8 +327,8 @@ func pulsarGetTopicPartitionsCallbackProxy(res 
C.pulsar_result, cPartitions *C.p
        }
 }
 
-func (client *client) CreateReaderAsync(options ReaderOptions, callback 
func(Reader, error)) {
-       createReaderAsync(client, options, callback)
+func (client *client) CreateReaderAsync(options ReaderOptions, schema Schema, 
callback func(Reader, error)) {
+       createReaderAsync(client, schema, options, callback)
 }
 
 func (client *client) TopicPartitions(topic string) ([]string, error) {
diff --git a/pulsar-client-go/pulsar/c_consumer.go 
b/pulsar-client-go/pulsar/c_consumer.go
index 97fa843..ac6d9ed 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -32,6 +32,7 @@ import (
 )
 
 type consumer struct {
+       schema         Schema
        client         *client
        ptr            *C.pulsar_consumer_t
        defaultChannel chan ConsumerMessage
@@ -53,18 +54,20 @@ func pulsarSubscribeCallbackProxy(res C.pulsar_result, ptr 
*C.pulsar_consumer_t,
                cc.callback(nil, newError(res, "Failed to subscribe to topic"))
        } else {
                cc.consumer.ptr = ptr
+               cc.consumer.schema = cc.schema
                runtime.SetFinalizer(cc.consumer, consumerFinalizer)
                cc.callback(cc.consumer, nil)
        }
 }
 
 type subscribeContext struct {
+       schema   Schema
        conf     *C.pulsar_consumer_configuration_t
        consumer *consumer
        callback func(Consumer, error)
 }
 
-func subscribeAsync(client *client, options ConsumerOptions, callback 
func(Consumer, error)) {
+func subscribeAsync(client *client, options ConsumerOptions, schema Schema, 
callback func(Consumer, error)) {
        if options.Topic == "" && options.Topics == nil && 
options.TopicsPattern == "" {
                go callback(nil, newError(C.pulsar_result_InvalidConfiguration, 
"topic is required"))
                return
@@ -109,6 +112,48 @@ func subscribeAsync(client *client, options 
ConsumerOptions, callback func(Consu
                C.pulsar_consumer_set_subscription_initial_position(conf, 
C.initial_position(options.SubscriptionInitPos))
        }
 
+       if schema != nil && schema.GetSchemaInfo() != nil {
+               if schema.GetSchemaInfo().Type != NONE {
+                       cName := C.CString(schema.GetSchemaInfo().Name)
+                       cSchema := C.CString(schema.GetSchemaInfo().Schema)
+                       properties := C.pulsar_string_map_create()
+                       defer C.free(unsafe.Pointer(cName))
+                       defer C.free(unsafe.Pointer(cSchema))
+                       defer C.pulsar_string_map_free(properties)
+
+                       for key, value := range 
schema.GetSchemaInfo().Properties {
+                               cKey := C.CString(key)
+                               cValue := C.CString(value)
+
+                               C.pulsar_string_map_put(properties, cKey, 
cValue)
+
+                               C.free(unsafe.Pointer(cKey))
+                               C.free(unsafe.Pointer(cValue))
+                       }
+                       C.pulsar_consumer_configuration_set_schema_info(conf, 
C.pulsar_schema_type(schema.GetSchemaInfo().Type),
+                               cName, cSchema, properties)
+               } else {
+                       cName := C.CString("BYTES")
+                       cSchema := C.CString("")
+                       properties := C.pulsar_string_map_create()
+                       defer C.free(unsafe.Pointer(cName))
+                       defer C.free(unsafe.Pointer(cSchema))
+                       defer C.pulsar_string_map_free(properties)
+
+                       for key, value := range 
schema.GetSchemaInfo().Properties {
+                               cKey := C.CString(key)
+                               cValue := C.CString(value)
+
+                               C.pulsar_string_map_put(properties, cKey, 
cValue)
+
+                               C.free(unsafe.Pointer(cKey))
+                               C.free(unsafe.Pointer(cValue))
+                       }
+                       C.pulsar_consumer_configuration_set_schema_info(conf, 
C.pulsar_schema_type(BYTES),
+                               cName, cSchema, properties)
+               }
+       }
+
        // ReceiverQueueSize==0 means to use the default queue size
        // -1 means to disable the consumer prefetching
        if options.ReceiverQueueSize > 0 {
@@ -147,7 +192,7 @@ func subscribeAsync(client *client, options 
ConsumerOptions, callback func(Consu
        subName := C.CString(options.SubscriptionName)
        defer C.free(unsafe.Pointer(subName))
 
-       callbackPtr := savePointer(&subscribeContext{conf: conf, consumer: 
consumer, callback: callback})
+       callbackPtr := savePointer(&subscribeContext{schema: schema, conf: 
conf, consumer: consumer, callback: callback})
 
        if options.Topic != "" {
                topic := C.CString(options.Topic)
@@ -193,12 +238,15 @@ func pulsarMessageListenerProxy(cConsumer 
*C.pulsar_consumer_t, message *C.pulsa
                        // There was an error when sending channel (eg: already 
closed)
                }
        }()
-
-       cc.channel <- ConsumerMessage{cc.consumer, newMessageWrapper(message)}
+       cc.channel <- ConsumerMessage{cc.consumer, 
newMessageWrapper(cc.consumer.Schema(), message)}
 }
 
 //// Consumer
 
+func (c *consumer) Schema() Schema {
+       return c.schema
+}
+
 func (c *consumer) Topic() string {
        return C.GoString(C.pulsar_consumer_get_topic(c.ptr))
 }
@@ -210,7 +258,9 @@ func (c *consumer) Subscription() string {
 func (c *consumer) Unsubscribe() error {
        channel := make(chan error)
        c.UnsubscribeAsync(func(err error) {
-               channel <- err; close(channel) })
+               channel <- err
+               close(channel)
+       })
        return <-channel
 }
 
diff --git a/pulsar-client-go/pulsar/c_message.go 
b/pulsar-client-go/pulsar/c_message.go
index 0dfbfa6..c45027a 100644
--- a/pulsar-client-go/pulsar/c_message.go
+++ b/pulsar-client-go/pulsar/c_message.go
@@ -32,7 +32,8 @@ import (
 )
 
 type message struct {
-       ptr *C.pulsar_message_t
+       ptr    *C.pulsar_message_t
+       schema Schema
 }
 
 type messageID struct {
@@ -97,8 +98,8 @@ func buildMessage(message ProducerMessage) 
*C.pulsar_message_t {
 
 ////////////// Message
 
-func newMessageWrapper(ptr *C.pulsar_message_t) Message {
-       msg := &message{ptr: ptr}
+func newMessageWrapper(schema Schema, ptr *C.pulsar_message_t) Message {
+       msg := &message{schema: schema, ptr: ptr}
        runtime.SetFinalizer(msg, messageFinalizer)
        return msg
 }
@@ -107,6 +108,10 @@ func messageFinalizer(msg *message) {
        C.pulsar_message_free(msg.ptr)
 }
 
+func (m *message) GetValue(v interface{}) error {
+       return m.schema.Decode(m.Payload(), v)
+}
+
 func (m *message) Properties() map[string]string {
        cProperties := C.pulsar_message_get_properties(m.ptr)
        defer C.pulsar_string_map_free(cProperties)
diff --git a/pulsar-client-go/pulsar/c_producer.go 
b/pulsar-client-go/pulsar/c_producer.go
index 80fe017..528df30 100644
--- a/pulsar-client-go/pulsar/c_producer.go
+++ b/pulsar-client-go/pulsar/c_producer.go
@@ -25,6 +25,7 @@ package pulsar
 import "C"
 import (
        "context"
+       "errors"
        "runtime"
        "time"
        "unsafe"
@@ -32,6 +33,7 @@ import (
 
 type createProducerCtx struct {
        client   *client
+       schema   Schema
        callback func(producer Producer, err error)
        conf     *C.pulsar_producer_configuration_t
 }
@@ -45,13 +47,13 @@ func pulsarCreateProducerCallbackProxy(res C.pulsar_result, 
ptr *C.pulsar_produc
        if res != C.pulsar_result_Ok {
                producerCtx.callback(nil, newError(res, "Failed to create 
Producer"))
        } else {
-               p := &producer{client: producerCtx.client, ptr: ptr}
+               p := &producer{client: producerCtx.client, schema: 
producerCtx.schema, ptr: ptr}
                runtime.SetFinalizer(p, producerFinalizer)
                producerCtx.callback(p, nil)
        }
 }
 
-func createProducerAsync(client *client, options ProducerOptions, callback 
func(producer Producer, err error)) {
+func createProducerAsync(client *client, schema Schema, options 
ProducerOptions, callback func(producer Producer, err error)) {
        if options.Topic == "" {
                go callback(nil, newError(C.pulsar_result_InvalidConfiguration, 
"topic is required when creating producer"))
                return
@@ -108,6 +110,48 @@ func createProducerAsync(client *client, options 
ProducerOptions, callback func(
                C.pulsar_producer_configuration_set_compression_type(conf, 
C.pulsar_compression_type(options.CompressionType))
        }
 
+       if schema != nil && schema.GetSchemaInfo() != nil {
+               if schema.GetSchemaInfo().Type != NONE {
+                       cName := C.CString(schema.GetSchemaInfo().Name)
+                       cSchema := C.CString(schema.GetSchemaInfo().Schema)
+                       properties := C.pulsar_string_map_create()
+                       defer C.free(unsafe.Pointer(cName))
+                       defer C.free(unsafe.Pointer(cSchema))
+                       defer C.pulsar_string_map_free(properties)
+
+                       for key, value := range 
schema.GetSchemaInfo().Properties {
+                               cKey := C.CString(key)
+                               cValue := C.CString(value)
+
+                               C.pulsar_string_map_put(properties, cKey, 
cValue)
+
+                               C.free(unsafe.Pointer(cKey))
+                               C.free(unsafe.Pointer(cValue))
+                       }
+                       C.pulsar_producer_configuration_set_schema_info(conf, 
C.pulsar_schema_type(schema.GetSchemaInfo().Type),
+                               cName, cSchema, properties)
+               } else {
+                       cName := C.CString("BYTES")
+                       cSchema := C.CString("")
+                       properties := C.pulsar_string_map_create()
+                       defer C.free(unsafe.Pointer(cName))
+                       defer C.free(unsafe.Pointer(cSchema))
+                       defer C.pulsar_string_map_free(properties)
+
+                       for key, value := range 
schema.GetSchemaInfo().Properties {
+                               cKey := C.CString(key)
+                               cValue := C.CString(value)
+
+                               C.pulsar_string_map_put(properties, cKey, 
cValue)
+
+                               C.free(unsafe.Pointer(cKey))
+                               C.free(unsafe.Pointer(cValue))
+                       }
+                       C.pulsar_producer_configuration_set_schema_info(conf, 
C.pulsar_schema_type(BYTES),
+                               cName, cSchema, properties)
+               }
+       }
+
        if options.MessageRouter != nil {
                C._pulsar_producer_configuration_set_message_router(conf, 
savePointer(&options.MessageRouter))
        }
@@ -141,7 +185,7 @@ func createProducerAsync(client *client, options 
ProducerOptions, callback func(
        defer C.free(unsafe.Pointer(topicName))
 
        C._pulsar_client_create_producer_async(client.ptr, topicName, conf,
-               savePointer(createProducerCtx{client, callback, conf}))
+               savePointer(createProducerCtx{client, schema, callback, conf}))
 }
 
 type topicMetadata struct {
@@ -155,7 +199,7 @@ func (tm *topicMetadata) NumPartitions() int {
 //export pulsarRouterCallbackProxy
 func pulsarRouterCallbackProxy(msg *C.pulsar_message_t, metadata 
*C.pulsar_topic_metadata_t, ctx unsafe.Pointer) C.int {
        router := restorePointerNoDelete(ctx).(*func(msg Message, metadata 
TopicMetadata) int)
-       partitionIdx := (*router)(&message{msg}, 
&topicMetadata{int(C.pulsar_topic_metadata_get_num_partitions(metadata))})
+       partitionIdx := (*router)(&message{ptr: msg}, 
&topicMetadata{int(C.pulsar_topic_metadata_get_num_partitions(metadata))})
        return C.int(partitionIdx)
 }
 
@@ -164,6 +208,7 @@ func pulsarRouterCallbackProxy(msg *C.pulsar_message_t, 
metadata *C.pulsar_topic
 type producer struct {
        client *client
        ptr    *C.pulsar_producer_t
+       schema Schema
 }
 
 func producerFinalizer(p *producer) {
@@ -178,6 +223,10 @@ func (p *producer) Name() string {
        return C.GoString(C.pulsar_producer_get_producer_name(p.ptr))
 }
 
+func (p *producer) Schema() Schema {
+       return p.schema
+}
+
 func (p *producer) LastSequenceID() int64 {
        return int64(C.pulsar_producer_get_last_sequence_id(p.ptr))
 }
@@ -212,10 +261,27 @@ func pulsarProducerSendCallbackProxy(res C.pulsar_result, 
message *C.pulsar_mess
 }
 
 func (p *producer) SendAsync(ctx context.Context, msg ProducerMessage, 
callback func(ProducerMessage, error)) {
+       if p.schema != nil {
+               if msg.Value == nil {
+                       callback(msg, errors.New("message value is nil, please 
check"))
+                       return
+               }
+               payLoad, err := p.schema.Encode(msg.Value)
+               if err != nil {
+                       callback(msg, errors.New("serialize message value 
error, please check"))
+                       return
+               }
+               msg.Payload = payLoad
+       } else {
+               if msg.Value != nil {
+                       callback(msg, errors.New("message value is set but no 
schema is provided, please check"))
+                       return
+               }
+       }
        cMsg := buildMessage(msg)
        defer C.pulsar_message_free(cMsg)
 
-       C._pulsar_producer_send_async(p.ptr, cMsg, 
savePointer(sendCallback{msg, callback}))
+       C._pulsar_producer_send_async(p.ptr, cMsg, 
savePointer(sendCallback{message: msg, callback: callback}))
 }
 
 func (p *producer) Close() error {
@@ -252,7 +318,6 @@ func (p *producer) FlushAsync(callback func(error)) {
        C._pulsar_producer_flush_async(p.ptr, savePointer(callback))
 }
 
-
 //export pulsarProducerFlushCallbackProxy
 func pulsarProducerFlushCallbackProxy(res C.pulsar_result, ctx unsafe.Pointer) 
{
        callback := restorePointer(ctx).(func(error))
diff --git a/pulsar-client-go/pulsar/c_reader.go 
b/pulsar-client-go/pulsar/c_reader.go
index 7336c1a..0abf4f0 100644
--- a/pulsar-client-go/pulsar/c_reader.go
+++ b/pulsar-client-go/pulsar/c_reader.go
@@ -31,6 +31,7 @@ import (
 )
 
 type reader struct {
+       schema         Schema
        client         *client
        ptr            *C.pulsar_reader_t
        defaultChannel chan ReaderMessage
@@ -52,18 +53,20 @@ func pulsarCreateReaderCallbackProxy(res C.pulsar_result, 
ptr *C.pulsar_reader_t
                cc.callback(nil, newError(res, "Failed to create Reader"))
        } else {
                cc.reader.ptr = ptr
+               cc.reader.schema = cc.schema
                runtime.SetFinalizer(cc.reader, readerFinalizer)
                cc.callback(cc.reader, nil)
        }
 }
 
 type readerAndCallback struct {
+       schema   Schema
        reader   *reader
        conf     *C.pulsar_reader_configuration_t
        callback func(Reader, error)
 }
 
-func createReaderAsync(client *client, options ReaderOptions, callback 
func(Reader, error)) {
+func createReaderAsync(client *client, schema Schema, options ReaderOptions, 
callback func(Reader, error)) {
        if options.Topic == "" {
                go callback(nil, newError(C.pulsar_result_InvalidConfiguration, 
"topic is required"))
                return
@@ -113,7 +116,7 @@ func createReaderAsync(client *client, options 
ReaderOptions, callback func(Read
        defer C.free(unsafe.Pointer(topic))
 
        C._pulsar_client_create_reader_async(client.ptr, topic, 
options.StartMessageID.(*messageID).ptr,
-               conf, savePointer(&readerAndCallback{reader, conf, callback}))
+               conf, savePointer(&readerAndCallback{schema: schema, reader: 
reader, conf: conf, callback: callback}))
 }
 
 type readerCallback struct {
@@ -132,13 +135,17 @@ func pulsarReaderListenerProxy(cReader 
*C.pulsar_reader_t, message *C.pulsar_mes
                }
        }()
 
-       rc.channel <- ReaderMessage{rc.reader, newMessageWrapper(message)}
+       rc.channel <- ReaderMessage{rc.reader, 
newMessageWrapper(rc.reader.Schema(), message)}
 }
 
 func (r *reader) Topic() string {
        return C.GoString(C.pulsar_reader_get_topic(r.ptr))
 }
 
+func (r *reader) Schema() Schema {
+       return r.schema
+}
+
 func (r *reader) Next(ctx context.Context) (Message, error) {
        select {
        case <-ctx.Done():
diff --git a/pulsar-client-go/pulsar/client.go 
b/pulsar-client-go/pulsar/client.go
index f62a403..5d41229 100644
--- a/pulsar-client-go/pulsar/client.go
+++ b/pulsar-client-go/pulsar/client.go
@@ -30,7 +30,7 @@ func NewClient(options ClientOptions) (Client, error) {
 }
 
 // Opaque interface that represents the authentication credentials
-type Authentication interface {}
+type Authentication interface{}
 
 // Create new Authentication provider with specified auth token
 func NewAuthenticationToken(token string) Authentication {
@@ -103,16 +103,22 @@ type Client interface {
        // This method will block until the producer is created successfully
        CreateProducer(ProducerOptions) (Producer, error)
 
+       CreateProducerWithSchema(ProducerOptions, Schema) (Producer, error)
+
        // Create a `Consumer` by subscribing to a topic.
        //
        // If the subscription does not exist, a new subscription will be 
created and all messages published after the
        // creation will be retained until acknowledged, even if the consumer 
is not connected
        Subscribe(ConsumerOptions) (Consumer, error)
 
+       SubscribeWithSchema(ConsumerOptions, Schema) (Consumer, error)
+
        // Create a Reader instance.
        // This method will block until the reader is created successfully.
        CreateReader(ReaderOptions) (Reader, error)
 
+       CreateReaderWithSchema(ReaderOptions, Schema) (Reader, error)
+
        // Fetch the list of partitions for a given topic
        //
        // If the topic is partitioned, this will return a list of partition 
names.
diff --git a/pulsar-client-go/pulsar/consumer.go 
b/pulsar-client-go/pulsar/consumer.go
index ae6b498..e2503e0 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -124,6 +124,8 @@ type ConsumerOptions struct {
        //  failure or exclusive subscriptions). Attempting to enable it on 
subscriptions to a non-persistent topics or on a
        //  shared subscription, will lead to the subscription call throwing a 
PulsarClientException.
        ReadCompacted bool
+
+       Schema
 }
 
 // An interface that abstracts behavior of Pulsar's consumer
@@ -198,4 +200,6 @@ type Consumer interface {
        // the connected consumers. This is a non blocking call and doesn't 
throw an exception. In case the connection
        // breaks, the messages are redelivered after reconnect.
        RedeliverUnackedMessages()
+
+       Schema() Schema
 }
diff --git a/pulsar-client-go/pulsar/consumer_test.go 
b/pulsar-client-go/pulsar/consumer_test.go
index b28341f..b34c5c9 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -83,7 +83,7 @@ func TestConsumer(t *testing.T) {
        assert.Nil(t, err)
        defer consumer.Close()
 
-       assert.Equal(t, consumer.Topic(), "persistent://public/default/" + 
topic)
+       assert.Equal(t, consumer.Topic(), "persistent://public/default/"+topic)
        assert.Equal(t, consumer.Subscription(), "my-sub")
 
        ctx := context.Background()
@@ -102,7 +102,7 @@ func TestConsumer(t *testing.T) {
                assert.NotNil(t, msg)
 
                assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-%d", 
i))
-               assert.Equal(t, msg.Topic(), "persistent://public/default/" + 
topic)
+               assert.Equal(t, msg.Topic(), 
"persistent://public/default/"+topic)
                fmt.Println("Send time: ", sendTime)
                fmt.Println("Publish time: ", msg.PublishTime())
                fmt.Println("Receive time: ", recvTime)
@@ -405,7 +405,7 @@ func TestConsumerRegex(t *testing.T) {
        }
 
        for i := 0; i < 20; i++ {
-               ctx, _ = context.WithTimeout(context.Background(), 1 * 
time.Second)
+               ctx, _ = context.WithTimeout(context.Background(), 
1*time.Second)
                msg, err := consumer.Receive(ctx)
                assert.Nil(t, err)
                assert.NotNil(t, msg)
@@ -585,6 +585,5 @@ func TestConsumerNegativeAcks(t *testing.T) {
                consumer.Ack(msg)
        }
 
-
        consumer.Unsubscribe()
 }
diff --git a/pulsar-client-go/pulsar/message.go 
b/pulsar-client-go/pulsar/message.go
index d20ed9a..bd2aa38 100644
--- a/pulsar-client-go/pulsar/message.go
+++ b/pulsar-client-go/pulsar/message.go
@@ -25,6 +25,9 @@ type ProducerMessage struct {
        // Payload for the message
        Payload []byte
 
+       //Value and payload is mutually exclusive, `Value interface{}` for 
schema message.
+       Value interface{}
+
        // Sets the key of the message for routing policy
        Key string
 
@@ -66,6 +69,9 @@ type Message interface {
 
        // Get the key of the message, if any
        Key() string
+
+       //Get the de-serialized value of the message, according the configured
+       GetValue(v interface{}) error
 }
 
 // Identifier for a particular message
diff --git a/pulsar-client-go/pulsar/pb/build.sh 
b/pulsar-client-go/pulsar/pb/build.sh
new file mode 100755
index 0000000..3987795
--- /dev/null
+++ b/pulsar-client-go/pulsar/pb/build.sh
@@ -0,0 +1,23 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+pkg=pb
+protoc --go_out=import_path=${pkg}:. hello.proto
+
diff --git a/pulsar-client-go/pulsar/pb/hello.pb.go 
b/pulsar-client-go/pulsar/pb/hello.pb.go
new file mode 100644
index 0000000..7cdf086
--- /dev/null
+++ b/pulsar-client-go/pulsar/pb/hello.pb.go
@@ -0,0 +1,100 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: hello.proto
+
+package pb
+
+import proto "github.com/golang/protobuf/proto"
+import fmt "fmt"
+import math "math"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type Test struct {
+       Num                  int32    `protobuf:"varint,1,opt,name=num,proto3" 
json:"num,omitempty"`
+       Msf                  string   `protobuf:"bytes,2,opt,name=msf,proto3" 
json:"msf,omitempty"`
+       XXX_NoUnkeyedLiteral struct{} `json:"-"`
+       XXX_unrecognized     []byte   `json:"-"`
+       XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *Test) Reset()         { *m = Test{} }
+func (m *Test) String() string { return proto.CompactTextString(m) }
+func (*Test) ProtoMessage()    {}
+func (*Test) Descriptor() ([]byte, []int) {
+       return fileDescriptor_hello_38c7a10202078446, []int{0}
+}
+func (m *Test) XXX_Unmarshal(b []byte) error {
+       return xxx_messageInfo_Test.Unmarshal(m, b)
+}
+func (m *Test) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+       return xxx_messageInfo_Test.Marshal(b, m, deterministic)
+}
+func (dst *Test) XXX_Merge(src proto.Message) {
+       xxx_messageInfo_Test.Merge(dst, src)
+}
+func (m *Test) XXX_Size() int {
+       return xxx_messageInfo_Test.Size(m)
+}
+func (m *Test) XXX_DiscardUnknown() {
+       xxx_messageInfo_Test.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Test proto.InternalMessageInfo
+
+func (m *Test) GetNum() int32 {
+       if m != nil {
+               return m.Num
+       }
+       return 0
+}
+
+func (m *Test) GetMsf() string {
+       if m != nil {
+               return m.Msf
+       }
+       return ""
+}
+
+func init() {
+       proto.RegisterType((*Test)(nil), "prototest.Test")
+}
+
+func init() { proto.RegisterFile("hello.proto", 
fileDescriptor_hello_38c7a10202078446) }
+
+var fileDescriptor_hello_38c7a10202078446 = []byte{
+       // 87 bytes of a gzipped FileDescriptorProto
+       0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 
0xce, 0x48, 0xcd, 0xc9,
+       0xc9, 0xd7, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x04, 0x53, 0x25, 
0xa9, 0xc5, 0x25, 0x4a,
+       0x5a, 0x5c, 0x2c, 0x21, 0xa9, 0xc5, 0x25, 0x42, 0x02, 0x5c, 0xcc, 0x79, 
0xa5, 0xb9, 0x12, 0x8c,
+       0x0a, 0x8c, 0x1a, 0xac, 0x41, 0x20, 0x26, 0x48, 0x24, 0xb7, 0x38, 0x4d, 
0x82, 0x49, 0x81, 0x51,
+       0x83, 0x33, 0x08, 0xc4, 0x4c, 0x62, 0x03, 0x6b, 0x33, 0x06, 0x04, 0x00, 
0x00, 0xff, 0xff, 0xc5,
+       0x3d, 0x96, 0x7b, 0x4c, 0x00, 0x00, 0x00,
+}
diff --git a/pulsar-client-go/pulsar/pb/hello.proto 
b/pulsar-client-go/pulsar/pb/hello.proto
new file mode 100644
index 0000000..547e273
--- /dev/null
+++ b/pulsar-client-go/pulsar/pb/hello.proto
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+syntax = "proto3";
+package prototest;
+
+message Test {
+    int32 num = 1;
+    string msf = 2;
+}
diff --git a/pulsar-client-go/pulsar/primitiveSerDe.go 
b/pulsar-client-go/pulsar/primitiveSerDe.go
new file mode 100644
index 0000000..767d26d
--- /dev/null
+++ b/pulsar-client-go/pulsar/primitiveSerDe.go
@@ -0,0 +1,318 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+       "encoding/binary"
+       "fmt"
+       "io"
+       "math"
+)
+
+const (
+       IoMaxSize     = 1024
+       maxBorrowSize = 10
+)
+
+var (
+       bigEndian = binary.BigEndian
+)
+
+type BinaryFreeList chan []byte
+
+var BinarySerializer BinaryFreeList = make(chan []byte, IoMaxSize)
+
+func (b BinaryFreeList) Borrow() (buf []byte) {
+       select {
+       case buf = <-b:
+       default:
+               buf = make([]byte, maxBorrowSize)
+
+       }
+       return buf[:maxBorrowSize]
+}
+
+func (b BinaryFreeList) Return(buf []byte) {
+       select {
+       case b <- buf:
+       default:
+       }
+}
+
+func (b BinaryFreeList) Uint8(r io.Reader) (uint8, error) {
+       buf := b.Borrow()[:1]
+       if _, err := io.ReadFull(r, buf); err != nil {
+               b.Return(buf)
+               return 0, err
+       }
+       rv := buf[0]
+       b.Return(buf)
+       return rv, nil
+}
+
+func (b BinaryFreeList) Uint16(r io.Reader, byteOrder binary.ByteOrder) 
(uint16, error) {
+       buf := b.Borrow()[:2]
+       if _, err := io.ReadFull(r, buf); err != nil {
+               b.Return(buf)
+               return 0, err
+       }
+       rv := byteOrder.Uint16(buf)
+       b.Return(buf)
+       return rv, nil
+}
+
+func (b BinaryFreeList) Uint32(r io.Reader, byteOrder binary.ByteOrder) 
(uint32, error) {
+       buf := b.Borrow()[:4]
+       if _, err := io.ReadFull(r, buf); err != nil {
+               b.Return(buf)
+               return 0, err
+       }
+       rv := byteOrder.Uint32(buf)
+       b.Return(buf)
+       return rv, nil
+}
+
+func (b BinaryFreeList) Uint64(r io.Reader, byteOrder binary.ByteOrder) 
(uint64, error) {
+       buf := b.Borrow()[:8]
+       if _, err := io.ReadFull(r, buf); err != nil {
+               b.Return(buf)
+               return 0, err
+       }
+       rv := byteOrder.Uint64(buf)
+       b.Return(buf)
+       return rv, nil
+}
+
+func (b BinaryFreeList) Float64(buf []byte) (float64, error) {
+       if len(buf) < 8 {
+               return 0, fmt.Errorf("cannot decode binary double: %s", 
io.ErrShortBuffer)
+       }
+       return math.Float64frombits(binary.BigEndian.Uint64(buf[:8])), nil
+}
+
+func (b BinaryFreeList) Float32(buf []byte) (float32, error) {
+       if len(buf) < 4 {
+               return 0, fmt.Errorf("cannot decode binary float: %s", 
io.ErrShortBuffer)
+       }
+       return math.Float32frombits(binary.BigEndian.Uint32(buf[:4])), nil
+}
+
+func (b BinaryFreeList) PutUint8(w io.Writer, val uint8) error {
+       buf := b.Borrow()[:1]
+       buf[0] = val
+       _, err := w.Write(buf)
+       b.Return(buf)
+       return err
+}
+
+func (b BinaryFreeList) PutUint16(w io.Writer, byteOrder binary.ByteOrder, val 
uint16) error {
+       buf := b.Borrow()[:2]
+       byteOrder.PutUint16(buf, val)
+       _, err := w.Write(buf)
+       b.Return(buf)
+       return err
+}
+
+func (b BinaryFreeList) PutUint32(w io.Writer, byteOrder binary.ByteOrder, val 
uint32) error {
+
+       buf := b.Borrow()[:4]
+       byteOrder.PutUint32(buf, val)
+       _, err := w.Write(buf)
+       b.Return(buf)
+       return err
+}
+
+func (b BinaryFreeList) PutUint64(w io.Writer, byteOrder binary.ByteOrder, val 
uint64) error {
+       buf := b.Borrow()[:8]
+       byteOrder.PutUint64(buf, val)
+       _, err := w.Write(buf)
+       b.Return(buf)
+       return err
+}
+
+func (b BinaryFreeList) PutDouble(datum interface{}) ([]byte, error) {
+       var value float64
+       switch v := datum.(type) {
+       case float64:
+               value = v
+       case float32:
+               value = float64(v)
+       case int:
+               if value = float64(v); int(value) != v {
+                       return nil, fmt.Errorf("serialize failed: provided Go 
int would lose precision: %d", v)
+               }
+       case int64:
+               if value = float64(v); int64(value) != v {
+                       return nil, fmt.Errorf("serialize failed: provided Go 
int64 would lose precision: %d", v)
+               }
+       case int32:
+               if value = float64(v); int32(value) != v {
+                       return nil, fmt.Errorf("serialize failed: provided Go 
int32 would lose precision: %d", v)
+               }
+       default:
+               return nil, fmt.Errorf("serialize failed: expected: Go numeric; 
received: %T", datum)
+       }
+       var buf []byte
+       buf = append(buf, 0, 0, 0, 0, 0, 0, 0, 0)
+       binary.BigEndian.PutUint64(buf[len(buf)-8:], math.Float64bits(value))
+       return buf, nil
+}
+
+func (b BinaryFreeList) PutFloat(datum interface{}) ([]byte, error) {
+       var value float32
+       switch v := datum.(type) {
+       case float32:
+               value = v
+       case float64:
+               value = float32(v)
+       case int:
+               if value = float32(v); int(value) != v {
+                       return nil, fmt.Errorf("serialize failed: provided Go 
int would lose precision: %d", v)
+               }
+       case int64:
+               if value = float32(v); int64(value) != v {
+                       return nil, fmt.Errorf("serialize failed: provided Go 
int64 would lose precision: %d", v)
+               }
+       case int32:
+               if value = float32(v); int32(value) != v {
+                       return nil, fmt.Errorf("serialize failed: provided Go 
int32 would lose precision: %d", v)
+               }
+       default:
+               return nil, fmt.Errorf("serialize failed: expected: Go numeric; 
received: %T", datum)
+       }
+       var buf []byte
+       buf = append(buf, 0, 0, 0, 0)
+       binary.BigEndian.PutUint32(buf[len(buf)-4:], 
uint32(math.Float32bits(value)))
+       return buf, nil
+}
+
+func ReadElements(r io.Reader, elements ...interface{}) error {
+       for _, element := range elements {
+               err := readElement(r, element)
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+func WriteElements(w io.Writer, elements ...interface{}) error {
+       for _, element := range elements {
+               err := writeElement(w, element)
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+func readElement(r io.Reader, element interface{}) error {
+       switch e := element.(type) {
+       case *int8:
+               rv, err := BinarySerializer.Uint8(r)
+               if err != nil {
+                       return err
+               }
+               *e = int8(rv)
+               return nil
+
+       case *int16:
+               rv, err := BinarySerializer.Uint16(r, bigEndian)
+               if err != nil {
+                       return err
+               }
+               *e = int16(rv)
+               return nil
+
+       case *int32:
+               rv, err := BinarySerializer.Uint32(r, bigEndian)
+               if err != nil {
+                       return err
+               }
+               *e = int32(rv)
+               return nil
+
+       case *int64:
+               rv, err := BinarySerializer.Uint64(r, bigEndian)
+               if err != nil {
+                       return err
+               }
+               *e = int64(rv)
+               return nil
+
+       case *bool:
+               rv, err := BinarySerializer.Uint8(r)
+               if err != nil {
+                       return err
+               }
+               if rv == 0x00 {
+                       *e = false
+               } else {
+                       *e = true
+               }
+               return nil
+       }
+       return binary.Read(r, bigEndian, element)
+}
+
+func writeElement(w io.Writer, element interface{}) error {
+       switch e := element.(type) {
+       case int8:
+               err := BinarySerializer.PutUint8(w, uint8(e))
+               if err != nil {
+                       return err
+               }
+               return nil
+
+       case int16:
+               err := BinarySerializer.PutUint16(w, bigEndian, uint16(e))
+               if err != nil {
+                       return err
+               }
+               return nil
+
+       case int32:
+               err := BinarySerializer.PutUint32(w, bigEndian, uint32(e))
+               if err != nil {
+                       return err
+               }
+               return nil
+
+       case int64:
+               err := BinarySerializer.PutUint64(w, bigEndian, uint64(e))
+               if err != nil {
+                       return err
+               }
+               return nil
+
+       case bool:
+               var err error
+               if e {
+                       err = BinarySerializer.PutUint8(w, 0x01)
+               } else {
+                       err = BinarySerializer.PutUint8(w, 0x00)
+               }
+               if err != nil {
+                       return err
+               }
+               return nil
+       }
+       return binary.Write(w, bigEndian, element)
+}
diff --git a/pulsar-client-go/pulsar/primitiveSerDe_test.go 
b/pulsar-client-go/pulsar/primitiveSerDe_test.go
new file mode 100644
index 0000000..f2f0335
--- /dev/null
+++ b/pulsar-client-go/pulsar/primitiveSerDe_test.go
@@ -0,0 +1,145 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+       "bytes"
+       "io"
+       "reflect"
+       "testing"
+
+       "github.com/davecgh/go-spew/spew"
+)
+
+func TestWriteElements(t *testing.T) {
+       tests := []struct {
+               in  interface{}
+               buf []byte
+       }{
+               {int8(1), []byte{0x01}},
+               {uint8(2), []byte{0x02}},
+               {int16(4), []byte{0x04, 0x00}},
+               {uint16(16), []byte{0x10, 0x00}},
+               {int32(1), []byte{0x01, 0x00, 0x00, 0x00}},
+               {uint32(256), []byte{0x00, 0x01, 0x00, 0x00}},
+               {
+                       int64(65536),
+                       []byte{0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00},
+               },
+               {
+                       uint64(4294967296),
+                       []byte{0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00},
+               },
+               {
+                       true,
+                       []byte{0x01},
+               },
+               {
+                       false,
+                       []byte{0x00},
+               },
+       }
+
+       t.Logf("Running %d tests", len(tests))
+       for i, test := range tests {
+               value := test
+
+               var buf bytes.Buffer
+               err := WriteElements(&buf, value.in)
+               if err != nil {
+                       t.Errorf("writeElement #%d error %v", i, err)
+                       continue
+               }
+               if !bytes.Equal(buf.Bytes(), test.buf) {
+                       t.Error(test.in)
+                       t.Errorf("writeElement #%d\n got: %s want: %s", i,
+                               spew.Sdump(buf.Bytes()), spew.Sdump(test.buf))
+                       continue
+               }
+
+               // Read from wire format.
+               rbuf := bytes.NewReader(test.buf)
+               val := test.in
+               if reflect.ValueOf(test.in).Kind() != reflect.Ptr {
+                       val = reflect.New(reflect.TypeOf(test.in)).Interface()
+               }
+               err = ReadElements(rbuf, val)
+               if err != nil {
+                       t.Errorf("readElement #%d error %v", i, err)
+                       continue
+               }
+               ival := val
+               if reflect.ValueOf(test.in).Kind() != reflect.Ptr {
+                       ival = 
reflect.Indirect(reflect.ValueOf(val)).Interface()
+               }
+               if !reflect.DeepEqual(ival, test.in) {
+                       t.Errorf("readElement #%d\n got: %s want: %s", i,
+                               spew.Sdump(ival), spew.Sdump(test.in))
+                       continue
+               }
+       }
+}
+
+func TestElementErrors(t *testing.T) {
+       tests := []struct {
+               in       interface{}
+               max      int
+               writeErr error
+               readErr  error
+       }{
+               {int8(1), 0, nil, io.EOF},
+               {uint8(2), 0, nil, io.EOF},
+               {int16(4), 0, nil, io.EOF},
+               {uint16(16), 0, nil, io.EOF},
+               {int32(1), 0, nil, io.EOF},
+               {uint32(256), 0, nil, io.EOF},
+               {
+                       int64(65536),
+                       0, nil, io.EOF,
+               },
+               {
+                       uint64(4294967296),
+                       0, nil, io.EOF,
+               },
+               {
+                       true,
+                       0, nil, io.EOF,
+               },
+               {
+                       false,
+                       0, nil, io.EOF,
+               },
+       }
+
+       t.Logf("Running %d tests", len(tests))
+       for i, test := range tests {
+               var r bytes.Reader
+               val := test.in
+               if reflect.ValueOf(test.in).Kind() != reflect.Ptr {
+                       val = reflect.New(reflect.TypeOf(test.in)).Interface()
+               }
+               err := ReadElements(&r, val)
+               if err != test.readErr {
+                       t.Errorf("readElement #%d wrong error got: %v, want: 
%v",
+                               i, err, test.readErr)
+                       continue
+               }
+       }
+}
diff --git a/pulsar-client-go/pulsar/producer.go 
b/pulsar-client-go/pulsar/producer.go
index 17aea45..5e199f7 100644
--- a/pulsar-client-go/pulsar/producer.go
+++ b/pulsar-client-go/pulsar/producer.go
@@ -184,4 +184,6 @@ type Producer interface {
        // No more writes will be accepted from this producer. Waits until all 
pending write request are persisted. In case
        // of errors, pending writes will not be retried.
        Close() error
+
+       Schema() Schema
 }
diff --git a/pulsar-client-go/pulsar/producer_test.go 
b/pulsar-client-go/pulsar/producer_test.go
index 633714b..df033a4 100644
--- a/pulsar-client-go/pulsar/producer_test.go
+++ b/pulsar-client-go/pulsar/producer_test.go
@@ -118,8 +118,7 @@ func TestProducerNoTopic(t *testing.T) {
 
        defer client.Close()
 
-       producer, err := client.CreateProducer(ProducerOptions{
-       })
+       producer, err := client.CreateProducer(ProducerOptions{})
 
        // Expect error in creating producer
        assert.Nil(t, producer)
diff --git a/pulsar-client-go/pulsar/reader.go 
b/pulsar-client-go/pulsar/reader.go
index 5592630..51ade6b 100644
--- a/pulsar-client-go/pulsar/reader.go
+++ b/pulsar-client-go/pulsar/reader.go
@@ -81,4 +81,6 @@ type Reader interface {
 
        // Close the reader and stop the broker to push more messages
        Close() error
+
+       Schema() Schema
 }
diff --git a/pulsar-client-go/pulsar/reader_test.go 
b/pulsar-client-go/pulsar/reader_test.go
index e6cb9f2..01247ce 100644
--- a/pulsar-client-go/pulsar/reader_test.go
+++ b/pulsar-client-go/pulsar/reader_test.go
@@ -75,7 +75,7 @@ func TestReader(t *testing.T) {
        assert.Nil(t, err)
        defer reader.Close()
 
-       assert.Equal(t, reader.Topic(), "persistent://public/default/" + topic )
+       assert.Equal(t, reader.Topic(), "persistent://public/default/"+topic)
 
        ctx := context.Background()
 
diff --git a/pulsar-client-go/pulsar/schema.go 
b/pulsar-client-go/pulsar/schema.go
new file mode 100644
index 0000000..4a6f7a1
--- /dev/null
+++ b/pulsar-client-go/pulsar/schema.go
@@ -0,0 +1,504 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+       "bytes"
+       "encoding/json"
+       "errors"
+       "reflect"
+       "unsafe"
+
+       log "github.com/apache/pulsar/pulsar-client-go/logutil"
+
+       "github.com/gogo/protobuf/proto"
+       "github.com/linkedin/goavro"
+)
+
+type SchemaType int
+
+const (
+       NONE         SchemaType = iota //No schema defined
+       STRING                         //Simple String encoding with UTF-8
+       JSON                           //JSON object encoding and validation
+       PROTOBUF                       //Protobuf message encoding and decoding
+       AVRO                           //Serialize and deserialize via Avro
+       BOOLEAN                        //
+       INT8                           //A 8-byte integer.
+       INT16                          //A 16-byte integer.
+       INT32                          //A 32-byte integer.
+       INT64                          //A 64-byte integer.
+       FLOAT                          //A float number.
+       DOUBLE                         //A double number
+       _                              //
+       _                              //
+       _                              //
+       KEY_VALUE                      //A Schema that contains Key Schema and 
Value Schema.
+       BYTES        = -1              //A bytes array.
+       AUTO         = -2              //
+       AUTO_CONSUME = -3              //Auto Consume Type.
+       AUTO_PUBLISH = -4              // Auto Publish Type.
+)
+
+// Encapsulates data around the schema definition
+type SchemaInfo struct {
+       Name   string
+       Schema string
+       Type   SchemaType
+       Properties map[string]string
+}
+
+type Schema interface {
+       Encode(v interface{}) ([]byte, error)
+       Decode(data []byte, v interface{}) error
+       Validate(message []byte) error
+       GetSchemaInfo() *SchemaInfo
+}
+
+type AvroCodec struct {
+       Codec *goavro.Codec
+}
+
+func NewSchemaDefinition(schema *goavro.Codec) *AvroCodec {
+       schemaDef := &AvroCodec{
+               Codec: schema,
+       }
+       return schemaDef
+}
+
+// initAvroCodec returns a Codec used to translate between a byte slice of 
either
+// binary or textual Avro data and native Go data.
+func initAvroCodec(codec string) (*goavro.Codec, error) {
+       return goavro.NewCodec(codec)
+}
+
+type JsonSchema struct {
+       AvroCodec
+       SchemaInfo
+}
+
+func NewJsonSchema(jsonAvroSchemaDef string, properties map[string]string) 
*JsonSchema {
+       js := new(JsonSchema)
+       avroCodec, err := initAvroCodec(jsonAvroSchemaDef)
+       if err != nil {
+               log.Fatalf("init codec error:%v", err)
+       }
+       schemaDef := NewSchemaDefinition(avroCodec)
+       js.SchemaInfo.Schema = schemaDef.Codec.Schema()
+       js.SchemaInfo.Type = JSON
+       js.SchemaInfo.Properties = properties
+       js.SchemaInfo.Name = "Json"
+       return js
+}
+
+func (js *JsonSchema) Encode(data interface{}) ([]byte, error) {
+       return json.Marshal(data)
+}
+
+func (js *JsonSchema) Decode(data []byte, v interface{}) error {
+       return json.Unmarshal(data, v)
+}
+
+func (js *JsonSchema) Validate(message []byte) error {
+       return js.Decode(message, nil)
+}
+
+func (js *JsonSchema) GetSchemaInfo() *SchemaInfo {
+       return &js.SchemaInfo
+}
+
+type ProtoSchema struct {
+       AvroCodec
+       SchemaInfo
+}
+
+func NewProtoSchema(protoAvroSchemaDef string, properties map[string]string) 
*ProtoSchema {
+       ps := new(ProtoSchema)
+       avroCodec, err := initAvroCodec(protoAvroSchemaDef)
+       if err != nil {
+               log.Fatalf("init codec error:%v", err)
+       }
+       schemaDef := NewSchemaDefinition(avroCodec)
+       ps.AvroCodec.Codec = schemaDef.Codec
+       ps.SchemaInfo.Schema = schemaDef.Codec.Schema()
+       ps.SchemaInfo.Type = PROTOBUF
+       ps.SchemaInfo.Properties = properties
+       ps.SchemaInfo.Name = "Proto"
+       return ps
+}
+
+func (ps *ProtoSchema) Encode(data interface{}) ([]byte, error) {
+       return proto.Marshal(data.(proto.Message))
+}
+
+func (ps *ProtoSchema) Decode(data []byte, v interface{}) error {
+       return proto.Unmarshal(data, v.(proto.Message))
+}
+
+func (ps *ProtoSchema) Validate(message []byte) error {
+       return ps.Decode(message, nil)
+}
+
+func (ps *ProtoSchema) GetSchemaInfo() *SchemaInfo {
+       return &ps.SchemaInfo
+}
+
+type AvroSchema struct {
+       AvroCodec
+       SchemaInfo
+}
+
+func NewAvroSchema(avroSchemaDef string, properties map[string]string) 
*AvroSchema {
+       as := new(AvroSchema)
+       avroCodec, err := initAvroCodec(avroSchemaDef)
+       if err != nil {
+               log.Fatalf("init codec error:%v", err)
+       }
+       schemaDef := NewSchemaDefinition(avroCodec)
+       as.AvroCodec.Codec = schemaDef.Codec
+       as.SchemaInfo.Schema = schemaDef.Codec.Schema()
+       as.SchemaInfo.Type = AVRO
+       as.SchemaInfo.Name = "Avro"
+       as.SchemaInfo.Properties = properties
+       return as
+}
+
+func (as *AvroSchema) Encode(data interface{}) ([]byte, error) {
+       textual, err := json.Marshal(data)
+       if err != nil {
+               log.Errorf("serialize data error:%s", err.Error())
+               return nil, err
+       }
+       native, _, err := as.Codec.NativeFromTextual(textual)
+       if err != nil {
+               log.Errorf("convert native Go form to binary Avro data 
error:%s", err.Error())
+               return nil, err
+       }
+       return as.Codec.BinaryFromNative(nil, native)
+}
+
+func (as *AvroSchema) Decode(data []byte, v interface{}) error {
+       native, _, err := as.Codec.NativeFromBinary(data)
+       if err != nil {
+               log.Errorf("convert binary Avro data back to native Go form 
error:%s", err.Error())
+               return err
+       }
+       textual, err := as.Codec.TextualFromNative(nil, native)
+       if err != nil {
+               log.Errorf("convert native Go form to textual Avro data 
error:%s", err.Error())
+               return err
+       }
+       err = json.Unmarshal(textual, v)
+       if err != nil {
+               log.Errorf("unSerialize textual error:%s", err.Error())
+               return err
+       }
+       return nil
+}
+
+func (as *AvroSchema) Validate(message []byte) error {
+       return as.Decode(message, nil)
+}
+
+func (as *AvroSchema) GetSchemaInfo() *SchemaInfo {
+       return &as.SchemaInfo
+}
+
+type StringSchema struct {
+       SchemaInfo
+}
+
+func NewStringSchema(properties map[string]string) *StringSchema {
+       strSchema := new(StringSchema)
+       strSchema.SchemaInfo.Properties = properties
+       strSchema.SchemaInfo.Name = "String"
+       strSchema.SchemaInfo.Type = STRING
+       strSchema.SchemaInfo.Schema = ""
+       return strSchema
+}
+
+func (ss *StringSchema) Encode(v interface{}) ([]byte, error) {
+       return []byte(v.(string)), nil
+}
+
+func (ss *StringSchema) Decode(data []byte, v interface{}) error {
+       bh := (*reflect.SliceHeader)(unsafe.Pointer(&data))
+       sh := reflect.StringHeader{
+               Data: bh.Data,
+               Len:  bh.Len,
+       }
+       shPtr := (*string)(unsafe.Pointer(&sh))
+       reflect.ValueOf(v).Elem().Set(reflect.ValueOf(shPtr))
+       return nil
+}
+
+func (ss *StringSchema) Validate(message []byte) error {
+       return ss.Decode(message, nil)
+}
+
+func (ss *StringSchema) GetSchemaInfo() *SchemaInfo {
+       return &ss.SchemaInfo
+}
+
+type BytesSchema struct {
+       SchemaInfo
+}
+
+func NewBytesSchema(properties map[string]string) *BytesSchema {
+       bytesSchema := new(BytesSchema)
+       bytesSchema.SchemaInfo.Properties = properties
+       bytesSchema.SchemaInfo.Name = "Bytes"
+       bytesSchema.SchemaInfo.Type = BYTES
+       bytesSchema.SchemaInfo.Schema = ""
+       return bytesSchema
+}
+
+func (bs *BytesSchema) Encode(data interface{}) ([]byte, error) {
+       return data.([]byte), nil
+}
+
+func (bs *BytesSchema) Decode(data []byte, v interface{}) error {
+       reflect.ValueOf(v).Elem().Set(reflect.ValueOf(data))
+       return nil
+}
+
+func (bs *BytesSchema) Validate(message []byte) error {
+       return bs.Decode(message, nil)
+}
+
+func (bs *BytesSchema) GetSchemaInfo() *SchemaInfo {
+       return &bs.SchemaInfo
+}
+
+type Int8Schema struct {
+       SchemaInfo
+}
+
+func NewInt8Schema(properties map[string]string) *Int8Schema {
+       int8Schema := new(Int8Schema)
+       int8Schema.SchemaInfo.Properties = properties
+       int8Schema.SchemaInfo.Schema = ""
+       int8Schema.SchemaInfo.Type = INT8
+       int8Schema.SchemaInfo.Name = "INT8"
+       return int8Schema
+}
+
+func (is8 *Int8Schema) Encode(value interface{}) ([]byte, error) {
+       var buf bytes.Buffer
+       err := WriteElements(&buf, value.(int8))
+       return buf.Bytes(), err
+}
+
+func (is8 *Int8Schema) Decode(data []byte, v interface{}) error {
+       buf := bytes.NewReader(data)
+       return ReadElements(buf, v)
+}
+
+func (is8 *Int8Schema) Validate(message []byte) error {
+       if len(message) != 1 {
+               return errors.New("size of data received by Int8Schema is not 
1")
+       }
+       return nil
+}
+
+func (is8 *Int8Schema) GetSchemaInfo() *SchemaInfo {
+       return &is8.SchemaInfo
+}
+
+type Int16Schema struct {
+       SchemaInfo
+}
+
+func NewInt16Schema(properties map[string]string) *Int16Schema {
+       int16Schema := new(Int16Schema)
+       int16Schema.SchemaInfo.Properties = properties
+       int16Schema.SchemaInfo.Name = "INT16"
+       int16Schema.SchemaInfo.Type = INT16
+       int16Schema.SchemaInfo.Schema = ""
+       return int16Schema
+}
+
+func (is16 *Int16Schema) Encode(value interface{}) ([]byte, error) {
+       var buf bytes.Buffer
+       err := WriteElements(&buf, value.(int16))
+       return buf.Bytes(), err
+}
+
+func (is16 *Int16Schema) Decode(data []byte, v interface{}) error {
+       buf := bytes.NewReader(data)
+       return ReadElements(buf, v)
+}
+
+func (is16 *Int16Schema) Validate(message []byte) error {
+       if len(message) != 2 {
+               return errors.New("size of data received by Int16Schema is not 
2")
+       }
+       return nil
+}
+
+func (is16 *Int16Schema) GetSchemaInfo() *SchemaInfo {
+       return &is16.SchemaInfo
+}
+
+type Int32Schema struct {
+       SchemaInfo
+}
+
+func NewInt32Schema(properties map[string]string) *Int32Schema {
+       int32Schema := new(Int32Schema)
+       int32Schema.SchemaInfo.Properties = properties
+       int32Schema.SchemaInfo.Schema = ""
+       int32Schema.SchemaInfo.Name = "INT32"
+       int32Schema.SchemaInfo.Type = INT32
+       return int32Schema
+}
+
+func (is32 *Int32Schema) Encode(value interface{}) ([]byte, error) {
+       var buf bytes.Buffer
+       err := WriteElements(&buf, value.(int32))
+       return buf.Bytes(), err
+}
+
+func (is32 *Int32Schema) Decode(data []byte, v interface{}) error {
+       buf := bytes.NewReader(data)
+       return ReadElements(buf, v)
+}
+
+func (is32 *Int32Schema) Validate(message []byte) error {
+       if len(message) != 4 {
+               return errors.New("size of data received by Int32Schema is not 
4")
+       }
+       return nil
+}
+
+func (is32 *Int32Schema) GetSchemaInfo() *SchemaInfo {
+       return &is32.SchemaInfo
+}
+
+type Int64Schema struct {
+       SchemaInfo
+}
+
+func NewInt64Schema(properties map[string]string) *Int64Schema {
+       int64Schema := new(Int64Schema)
+       int64Schema.SchemaInfo.Properties = properties
+       int64Schema.SchemaInfo.Name = "INT64"
+       int64Schema.SchemaInfo.Type = INT64
+       int64Schema.SchemaInfo.Schema = ""
+       return int64Schema
+}
+
+func (is64 *Int64Schema) Encode(value interface{}) ([]byte, error) {
+       var buf bytes.Buffer
+       err := WriteElements(&buf, value.(int64))
+       return buf.Bytes(), err
+}
+
+func (is64 *Int64Schema) Decode(data []byte, v interface{}) error {
+       buf := bytes.NewReader(data)
+       return ReadElements(buf, v)
+}
+
+func (is64 *Int64Schema) Validate(message []byte) error {
+       if len(message) != 8 {
+               return errors.New("size of data received by Int64Schema is not 
8")
+       }
+       return nil
+}
+
+func (is64 *Int64Schema) GetSchemaInfo() *SchemaInfo {
+       return &is64.SchemaInfo
+}
+
+type FloatSchema struct {
+       SchemaInfo
+}
+
+func NewFloatSchema(properties map[string]string) *FloatSchema {
+       floatSchema := new(FloatSchema)
+       floatSchema.SchemaInfo.Properties = properties
+       floatSchema.SchemaInfo.Type = FLOAT
+       floatSchema.SchemaInfo.Name = "FLOAT"
+       floatSchema.SchemaInfo.Schema = ""
+       return floatSchema
+}
+
+func (fs *FloatSchema) Encode(value interface{}) ([]byte, error) {
+       return BinarySerializer.PutFloat(value)
+}
+
+func (fs *FloatSchema) Decode(data []byte, v interface{}) error {
+       floatValue, err := BinarySerializer.Float32(data)
+       if err != nil {
+               log.Errorf("unSerialize float error:%s", err.Error())
+               return err
+       }
+       reflect.ValueOf(v).Elem().Set(reflect.ValueOf(floatValue))
+       return nil
+}
+
+func (fs *FloatSchema) Validate(message []byte) error {
+       if len(message) != 4 {
+               return errors.New("size of data received by FloatSchema is not 
4")
+       }
+       return nil
+}
+
+func (fs *FloatSchema) GetSchemaInfo() *SchemaInfo {
+       return &fs.SchemaInfo
+}
+
+type DoubleSchema struct {
+       SchemaInfo
+}
+
+func NewDoubleSchema(properties map[string]string) *DoubleSchema {
+       doubleSchema := new(DoubleSchema)
+       doubleSchema.SchemaInfo.Properties = properties
+       doubleSchema.SchemaInfo.Type = DOUBLE
+       doubleSchema.SchemaInfo.Name = "DOUBLE"
+       doubleSchema.SchemaInfo.Schema = ""
+       return doubleSchema
+}
+
+func (ds *DoubleSchema) Encode(value interface{}) ([]byte, error) {
+       return BinarySerializer.PutDouble(value)
+}
+
+func (ds *DoubleSchema) Decode(data []byte, v interface{}) error {
+       doubleValue, err := BinarySerializer.Float64(data)
+       if err != nil {
+               log.Errorf("unSerialize double value error:%s", err.Error())
+               return err
+       }
+       reflect.ValueOf(v).Elem().Set(reflect.ValueOf(doubleValue))
+       return nil
+}
+
+func (ds *DoubleSchema) Validate(message []byte) error {
+       if len(message) != 8 {
+               return errors.New("size of data received by DoubleSchema is not 
8")
+       }
+       return nil
+}
+
+func (ds *DoubleSchema) GetSchemaInfo() *SchemaInfo {
+       return &ds.SchemaInfo
+}
diff --git a/pulsar-client-go/pulsar/schemaDef_test.go 
b/pulsar-client-go/pulsar/schemaDef_test.go
new file mode 100644
index 0000000..5fb6803
--- /dev/null
+++ b/pulsar-client-go/pulsar/schemaDef_test.go
@@ -0,0 +1,58 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestSchemaDef(t *testing.T) {
+       errSchemaDef := 
"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+               
"\"fields\":[{\"name\":\"ID\",\"type\":\"int64\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
+       _, err := initAvroCodec(errSchemaDef)
+       assert.NotNil(t, err)
+
+       errSchemaDef1 := 
"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+               
"\"fields\":[{\"name\":\"ID\",\"type\":\"bool\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
+       _, err = initAvroCodec(errSchemaDef1)
+       assert.NotNil(t, err)
+
+       errSchemaDef2 := 
"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+               
"\"fields\":[{\"name\":\"ID\",\"type\":\"float32\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
+       _, err = initAvroCodec(errSchemaDef2)
+       assert.NotNil(t, err)
+
+       errSchemaDef3 := 
"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+               
"\"fields\":[{\"name\":\"ID\",\"type\":\"float64\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
+       _, err = initAvroCodec(errSchemaDef3)
+       assert.NotNil(t, err)
+
+       errSchemaDef4 := 
"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+               
"\"fields\":[{\"name\":\"ID\",\"type\":\"byte\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
+       _, err = initAvroCodec(errSchemaDef4)
+       assert.NotNil(t, err)
+
+       errSchemaDef5 := 
"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"operation.createJsonConsumer$\","
 +
+               
"\"fields\":[{\"name\":\"ID\",\"type\":\"byte\"},{\"name\":\"Name\",\"type\":\":[\"null\",\"string\"],\"default\":null\"}]}"
+       _, err = initAvroCodec(errSchemaDef5)
+       assert.NotNil(t, err)
+}
diff --git a/pulsar-client-go/pulsar/schema_test.go 
b/pulsar-client-go/pulsar/schema_test.go
new file mode 100644
index 0000000..3c83188
--- /dev/null
+++ b/pulsar-client-go/pulsar/schema_test.go
@@ -0,0 +1,436 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+       "context"
+       "testing"
+
+       log "github.com/apache/pulsar/pulsar-client-go/logutil"
+       "github.com/apache/pulsar/pulsar-client-go/pulsar/pb"
+       "github.com/stretchr/testify/assert"
+)
+
+type testJson struct {
+       ID   int    `json:"id"`
+       Name string `json:"name"`
+}
+
+type testAvro struct {
+       ID   int
+       Name string
+}
+
+var (
+       exampleSchemaDef = 
"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+               
"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
+       protoSchemaDef = 
"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+               
"\"fields\":[{\"name\":\"num\",\"type\":\"int\"},{\"name\":\"msf\",\"type\":\"string\"}]}"
+)
+
+func createClient() Client {
+       // create client
+       lookupUrl := "pulsar://localhost:6650"
+       client, err := NewClient(ClientOptions{
+               URL: lookupUrl,
+       })
+       if err != nil {
+               log.Fatal(err)
+       }
+       return client
+}
+
+func TestJsonSchema(t *testing.T) {
+       client := createClient()
+       defer client.Close()
+
+       jsonSchema := NewJsonSchema(exampleSchemaDef, nil)
+       producer, err := client.CreateTypedProducer(ProducerOptions{
+               Topic: "jsonTopic",
+       }, jsonSchema)
+       err = producer.Send(context.Background(), ProducerMessage{
+               Value: &testJson{
+                       ID:   100,
+                       Name: "pulsar",
+               },
+       })
+       if err != nil {
+               log.Fatal(err)
+       }
+       defer producer.Close()
+
+       properties := make(map[string]string)
+       properties["pulsar"]="hello"
+       jsonSchemaWithProperties := NewJsonSchema(exampleSchemaDef, properties)
+       producer1, err := client.CreateTypedProducer(ProducerOptions{
+               Topic: "jsonTopic",
+       }, jsonSchemaWithProperties)
+       err = producer1.Send(context.Background(), ProducerMessage{
+               Value: &testJson{
+                       ID:   100,
+                       Name: "pulsar",
+               },
+       })
+       if err != nil {
+               log.Fatal(err)
+       }
+       defer producer1.Close()
+
+       //create consumer
+       var s testJson
+
+       consumerJS := NewJsonSchema(exampleSchemaDef, nil)
+       consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+               Topic:            "jsonTopic",
+               SubscriptionName: "sub-2",
+       }, consumerJS)
+       assert.Nil(t, err)
+       msg, err := consumer.Receive(context.Background())
+       assert.Nil(t, err)
+       err = msg.GetValue(&s)
+       assert.Nil(t, err)
+       assert.Equal(t, s.ID, 100)
+       assert.Equal(t, s.Name, "pulsar")
+
+       defer consumer.Close()
+}
+
+func TestProtoSchema(t *testing.T) {
+       client := createClient()
+       defer client.Close()
+
+       // create producer
+       psProducer := NewProtoSchema(protoSchemaDef, nil)
+       producer, err := client.CreateTypedProducer(ProducerOptions{
+               Topic: "proto",
+       }, psProducer)
+       if err := producer.Send(context.Background(), ProducerMessage{
+               Value: &pb.Test{
+                       Num: 100,
+                       Msf: "pulsar",
+               },
+       }); err != nil {
+               log.Fatal(err)
+       }
+
+       //create consumer
+       unobj := pb.Test{}
+       psConsumer := NewProtoSchema(protoSchemaDef, nil)
+       consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+               Topic:            "proto",
+               SubscriptionName: "sub-1",
+       }, psConsumer)
+       assert.Nil(t, err)
+
+       msg, err := consumer.Receive(context.Background())
+       assert.Nil(t, err)
+       err = msg.GetValue(&unobj)
+       assert.Nil(t, err)
+       assert.Equal(t, unobj.Num, int32(100))
+       assert.Equal(t, unobj.Msf, "pulsar")
+       defer consumer.Close()
+}
+
+func TestAvroSchema(t *testing.T) {
+       client := createClient()
+       defer client.Close()
+
+       // create producer
+       asProducer := NewAvroSchema(exampleSchemaDef, nil)
+       producer, err := client.CreateTypedProducer(ProducerOptions{
+               Topic: "avro-topic",
+       }, asProducer)
+       assert.Nil(t, err)
+       if err := producer.Send(context.Background(), ProducerMessage{
+               Value: testAvro{
+                       ID:   100,
+                       Name: "pulsar",
+               },
+       }); err != nil {
+               log.Fatal(err)
+       }
+
+       //create consumer
+       unobj := testAvro{}
+
+       asConsumer := NewAvroSchema(exampleSchemaDef, nil)
+       consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+               Topic:            "avro-topic",
+               SubscriptionName: "sub-1",
+       }, asConsumer)
+       assert.Nil(t, err)
+
+       msg, err := consumer.Receive(context.Background())
+       assert.Nil(t, err)
+       err = msg.GetValue(&unobj)
+       assert.Nil(t, err)
+       assert.Equal(t, unobj.ID, 100)
+       assert.Equal(t, unobj.Name, "pulsar")
+       defer consumer.Close()
+}
+
+func TestStringSchema(t *testing.T) {
+       client := createClient()
+       defer client.Close()
+
+       ssProducer := NewStringSchema(nil)
+       producer, err := client.CreateTypedProducer(ProducerOptions{
+               Topic: "strTopic",
+       }, ssProducer)
+       assert.Nil(t, err)
+       if err := producer.Send(context.Background(), ProducerMessage{
+               Value: "hello pulsar",
+       }); err != nil {
+               log.Fatal(err)
+       }
+       defer producer.Close()
+
+       var res *string
+       consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+               Topic:            "strTopic",
+               SubscriptionName: "sub-2",
+       }, NewStringSchema(nil))
+       assert.Nil(t, err)
+
+       msg, err := consumer.Receive(context.Background())
+       assert.Nil(t, err)
+       err = msg.GetValue(&res)
+       assert.Equal(t, *res, "hello pulsar")
+
+       defer consumer.Close()
+}
+
+func TestBytesSchema(t *testing.T) {
+       client := createClient()
+       defer client.Close()
+
+       bytes := []byte{121, 110, 121, 110}
+       producer, err := client.CreateTypedProducer(ProducerOptions{
+               Topic: "bytesTopic",
+       }, NewBytesSchema(nil))
+       assert.Nil(t, err)
+       ctx := context.Background()
+       if err := producer.Send(ctx, ProducerMessage{
+               Value: bytes,
+       }); err != nil {
+               log.Fatal(err)
+       }
+       defer producer.Close()
+
+       var res []byte
+       consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+               Topic:            "bytesTopic",
+               SubscriptionName: "sub-2",
+       }, NewBytesSchema(nil))
+       assert.Nil(t, err)
+
+       msg, err := consumer.Receive(context.Background())
+       assert.Nil(t, err)
+       err = msg.GetValue(&res)
+       assert.Equal(t, res, bytes)
+
+       defer consumer.Close()
+}
+
+func TestInt8Schema(t *testing.T) {
+       client := createClient()
+       defer client.Close()
+
+       producer, err := client.CreateTypedProducer(ProducerOptions{
+               Topic: "int8Topic1",
+       }, NewInt8Schema(nil))
+       assert.Nil(t, err)
+       ctx := context.Background()
+       if err := producer.Send(ctx, ProducerMessage{
+               Value: int8(1),
+       }); err != nil {
+               log.Fatal(err)
+       }
+       defer producer.Close()
+
+       consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+               Topic:            "int8Topic1",
+               SubscriptionName: "sub-2",
+       }, NewInt8Schema(nil))
+       assert.Nil(t, err)
+
+       var res int8
+       msg, err := consumer.Receive(ctx)
+       assert.Nil(t, err)
+       err = msg.GetValue(&res)
+       assert.Nil(t, err)
+       assert.Equal(t, res, int8(1))
+
+       defer consumer.Close()
+}
+
+func TestInt16Schema(t *testing.T) {
+       client := createClient()
+       defer client.Close()
+
+       producer, err := client.CreateTypedProducer(ProducerOptions{
+               Topic: "int16Topic",
+       }, NewInt16Schema(nil))
+       assert.Nil(t, err)
+       ctx := context.Background()
+       if err := producer.Send(ctx, ProducerMessage{
+               Value: int16(1),
+       }); err != nil {
+               log.Fatal(err)
+       }
+       defer producer.Close()
+
+       consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+               Topic:            "int16Topic",
+               SubscriptionName: "sub-2",
+       }, NewInt16Schema(nil))
+       assert.Nil(t, err)
+
+       var res int16
+       msg, err := consumer.Receive(ctx)
+       assert.Nil(t, err)
+       err = msg.GetValue(&res)
+       assert.Nil(t, err)
+       assert.Equal(t, res, int16(1))
+       defer consumer.Close()
+}
+
+func TestInt32Schema(t *testing.T) {
+       client := createClient()
+       defer client.Close()
+
+       producer, err := client.CreateTypedProducer(ProducerOptions{
+               Topic: "int32Topic1",
+       }, NewInt32Schema(nil))
+       assert.Nil(t, err)
+       ctx := context.Background()
+       if err := producer.Send(ctx, ProducerMessage{
+               Value: int32(1),
+       }); err != nil {
+               log.Fatal(err)
+       }
+       defer producer.Close()
+
+       consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+               Topic:            "int32Topic1",
+               SubscriptionName: "sub-2",
+       }, NewInt32Schema(nil))
+       assert.Nil(t, err)
+
+       var res int32
+       msg, err := consumer.Receive(ctx)
+       assert.Nil(t, err)
+       err = msg.GetValue(&res)
+       assert.Nil(t, err)
+       assert.Equal(t, res, int32(1))
+       defer consumer.Close()
+}
+
+func TestInt64Schema(t *testing.T) {
+       client := createClient()
+       defer client.Close()
+
+       producer, err := client.CreateTypedProducer(ProducerOptions{
+               Topic: "int64Topic",
+       }, NewInt64Schema(nil))
+       assert.Nil(t, err)
+       ctx := context.Background()
+       if err := producer.Send(ctx, ProducerMessage{
+               Value: int64(1),
+       }); err != nil {
+               log.Fatal(err)
+       }
+       defer producer.Close()
+
+       consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+               Topic:            "int64Topic",
+               SubscriptionName: "sub-2",
+       }, NewInt64Schema(nil))
+       assert.Nil(t, err)
+
+       var res int64
+       msg, err := consumer.Receive(ctx)
+       assert.Nil(t, err)
+       err = msg.GetValue(&res)
+       assert.Nil(t, err)
+       assert.Equal(t, res, int64(1))
+       defer consumer.Close()
+}
+
+func TestFloatSchema(t *testing.T) {
+       client := createClient()
+       defer client.Close()
+
+       producer, err := client.CreateTypedProducer(ProducerOptions{
+               Topic: "floatTopic",
+       }, NewFloatSchema(nil))
+       assert.Nil(t, err)
+       if err := producer.Send(context.Background(), ProducerMessage{
+               Value: float32(1),
+       }); err != nil {
+               log.Fatal(err)
+       }
+       defer producer.Close()
+
+       consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+               Topic:            "floatTopic",
+               SubscriptionName: "sub-2",
+       }, NewFloatSchema(nil))
+       assert.Nil(t, err)
+
+       var res float32
+       msg, err := consumer.Receive(context.Background())
+       assert.Nil(t, err)
+       err = msg.GetValue(&res)
+       assert.Nil(t, err)
+       assert.Equal(t, res, float32(1))
+       defer consumer.Close()
+}
+
+func TestDoubleSchema(t *testing.T) {
+       client := createClient()
+       defer client.Close()
+
+       producer, err := client.CreateTypedProducer(ProducerOptions{
+               Topic: "doubleTopic",
+       }, NewDoubleSchema(nil))
+       assert.Nil(t, err)
+       ctx := context.Background()
+       if err := producer.Send(ctx, ProducerMessage{
+               Value: float64(1),
+       }); err != nil {
+               log.Fatal(err)
+       }
+       defer producer.Close()
+
+       consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+               Topic:            "doubleTopic",
+               SubscriptionName: "sub-2",
+       }, NewDoubleSchema(nil))
+       assert.Nil(t, err)
+
+       var res float64
+       msg, err := consumer.Receive(ctx)
+       assert.Nil(t, err)
+       err = msg.GetValue(&res)
+       assert.Nil(t, err)
+       assert.Equal(t, res, float64(1))
+       defer consumer.Close()
+}
diff --git a/pulsar-client-go/pulsar/util_test.go 
b/pulsar-client-go/pulsar/testhelps.go
similarity index 100%
rename from pulsar-client-go/pulsar/util_test.go
rename to pulsar-client-go/pulsar/testhelps.go

Reply via email to