This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ac13009  Modify the schema decode method can decode ByteBuf (#5123)
ac13009 is described below

commit ac13009c5990d99fc935f7b83ad10d4c79772990
Author: congbo <39078850+congbobo...@users.noreply.github.com>
AuthorDate: Thu Sep 19 20:25:52 2019 +0800

    Modify the schema decode method can decode ByteBuf (#5123)
---
 .../pulsar/client/api/schema/SchemaReader.java     | 25 ++++++++-
 .../pulsar/client/impl/schema/AbstractSchema.java  | 64 ++++++++++++++++++++++
 .../pulsar/client/impl/schema/BooleanSchema.java   | 12 +++-
 .../pulsar/client/impl/schema/ByteBufSchema.java   | 12 +++-
 .../client/impl/schema/ByteBufferSchema.java       | 18 +++++-
 .../pulsar/client/impl/schema/ByteSchema.java      | 20 ++++++-
 .../pulsar/client/impl/schema/BytesSchema.java     | 16 +++++-
 .../pulsar/client/impl/schema/DateSchema.java      | 14 ++++-
 .../pulsar/client/impl/schema/DoubleSchema.java    | 27 ++++++++-
 .../pulsar/client/impl/schema/FloatSchema.java     | 26 ++++++++-
 .../pulsar/client/impl/schema/IntSchema.java       | 27 ++++++++-
 .../pulsar/client/impl/schema/LongSchema.java      | 26 ++++++++-
 .../pulsar/client/impl/schema/ShortSchema.java     | 26 ++++++++-
 .../pulsar/client/impl/schema/StringSchema.java    | 29 +++++++++-
 .../pulsar/client/impl/schema/StructSchema.java    | 22 +++++++-
 .../pulsar/client/impl/schema/TimeSchema.java      | 14 ++++-
 .../pulsar/client/impl/schema/TimestampSchema.java | 14 ++++-
 .../impl/schema/generic/GenericAvroReader.java     | 30 +++++++++-
 .../impl/schema/generic/GenericJsonReader.java     | 27 ++++++++-
 .../client/impl/schema/reader/AvroReader.java      | 32 ++++++++++-
 .../client/impl/schema/reader/JsonReader.java      | 24 +++++++-
 .../client/impl/schema/reader/ProtobufReader.java  | 26 ++++++++-
 .../pulsar/client/impl/schema/AvroSchemaTest.java  | 26 +++++++++
 .../client/impl/schema/BooleanSchemaTest.java      | 15 ++++-
 .../pulsar/client/impl/schema/BytesSchemaTest.java |  7 +++
 .../pulsar/client/impl/schema/DateSchemaTest.java  | 17 +++++-
 .../client/impl/schema/DoubleSchemaTest.java       | 13 ++++-
 .../pulsar/client/impl/schema/FloatSchemaTest.java | 17 +++++-
 .../pulsar/client/impl/schema/IntSchemaTest.java   | 15 ++++-
 .../pulsar/client/impl/schema/JSONSchemaTest.java  | 24 ++++++++
 .../pulsar/client/impl/schema/LongSchemaTest.java  | 17 +++++-
 .../client/impl/schema/PrimitiveSchemaTest.java    |  7 ++-
 .../client/impl/schema/ProtobufSchemaTest.java     | 16 ++++++
 .../pulsar/client/impl/schema/ShortSchemaTest.java | 15 ++++-
 .../client/impl/schema/StringSchemaTest.java       | 23 ++++++++
 .../pulsar/client/impl/schema/TimeSchemaTest.java  | 10 +++-
 .../client/impl/schema/TimestampSchemaTest.java    | 12 +++-
 37 files changed, 708 insertions(+), 57 deletions(-)

diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java
index 29155cc..d073387 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java
@@ -18,9 +18,12 @@
  */
 package org.apache.pulsar.client.api.schema;
 
+import java.io.InputStream;
+
 /**
  * Deserialize messages from bytes.
  */
+
 public interface SchemaReader<T> {
 
     /**
@@ -29,5 +32,25 @@ public interface SchemaReader<T> {
      * @param bytes the data
      * @return the serialized object
      */
-    T read(byte[] bytes);
+    default T read(byte[] bytes) {
+        return read(bytes, 0, bytes.length);
+    }
+
+    /**
+     * serialize bytes convert pojo.
+     *
+     * @param bytes the data
+     * @param offset the byte[] initial position
+     * @param length the byte[] read length
+     * @return the serialized object
+     */
+    T read(byte[] bytes, int offset, int length);
+
+    /**
+     * serialize bytes convert pojo.
+     *
+     * @param inputStream the stream of message
+     * @return the serialized object
+     */
+    T read(InputStream inputStream);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
new file mode 100644
index 0000000..f459d5c
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+
+abstract class AbstractSchema<T> implements Schema<T> {
+
+    /**
+     * Check if the message read able length length is a valid object for this 
schema.
+     *
+     * <p>The implementation can choose what its most efficient approach to 
validate the schema.
+     * If the implementation doesn't provide it, it will attempt to use {@link 
#decode(ByteBuf)}
+     * to see if this schema can decode this message or not as a validation 
mechanism to verify
+     * the bytes.
+     *
+     * @param byteBuf the messages to verify
+     * @return true if it is a valid message
+     * @throws SchemaSerializationException if it is not a valid message
+     */
+    void validate(ByteBuf byteBuf) {
+        throw new SchemaSerializationException("This method is not supported");
+    };
+
+    /**
+     * Decode a byteBuf into an object using the schema definition and 
deserializer implementation
+     *
+     * @param byteBuf
+     *            the byte buffer to decode
+     * @return the deserialized object
+     */
+    abstract T decode(ByteBuf byteBuf);
+    /**
+     * Decode a byteBuf into an object using a given version.
+     *
+     * @param byteBuf
+     *            the byte array to decode
+     * @param schemaVersion
+     *            the schema version to decode the object. null indicates 
using latest version.
+     * @return the deserialized object
+     */
+    T decode(ByteBuf byteBuf, byte[] schemaVersion) {
+        // ignore version by default (most of the primitive schema 
implementations ignore schema version)
+        return decode(byteBuf);
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java
index 5b27dfc..c38c356 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
-import org.apache.pulsar.client.api.Schema;
+import io.netty.buffer.ByteBuf;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -26,7 +26,7 @@ import org.apache.pulsar.common.schema.SchemaType;
 /**
  * A schema for `Boolean`.
  */
-public class BooleanSchema implements Schema<Boolean> {
+public class BooleanSchema extends AbstractSchema<Boolean> {
 
     public static BooleanSchema of() {
         return INSTANCE;
@@ -64,6 +64,14 @@ public class BooleanSchema implements Schema<Boolean> {
     }
 
     @Override
+    public Boolean decode(ByteBuf byteBuf) {
+        if (null == byteBuf) {
+            return null;
+        }
+        return byteBuf.getBoolean(0);
+    }
+
+    @Override
     public SchemaInfo getSchemaInfo() {
         return SCHEMA_INFO;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
index 4e7e6d0..e08c80a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
@@ -21,14 +21,13 @@ package org.apache.pulsar.client.impl.schema;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
-import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
 /**
  * A variant `Bytes` schema that takes {@link io.netty.buffer.ByteBuf}.
  */
-public class ByteBufSchema implements Schema<ByteBuf> {
+public class ByteBufSchema extends AbstractSchema<ByteBuf> {
 
     public static ByteBufSchema of() {
         return INSTANCE;
@@ -59,6 +58,15 @@ public class ByteBufSchema implements Schema<ByteBuf> {
     }
 
     @Override
+    public ByteBuf decode(ByteBuf byteBuf) {
+        if (null == byteBuf) {
+            return null;
+        } else {
+            return byteBuf;
+        }
+    }
+
+    @Override
     public SchemaInfo getSchemaInfo() {
         return SCHEMA_INFO;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
index 251cd93..c983aa7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
@@ -19,14 +19,16 @@
 package org.apache.pulsar.client.impl.schema;
 
 import java.nio.ByteBuffer;
-import org.apache.pulsar.client.api.Schema;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
 /**
  * A bytebuffer schema is effectively a `BYTES` schema.
  */
-public class ByteBufferSchema implements Schema<ByteBuffer> {
+public class ByteBufferSchema extends AbstractSchema<ByteBuffer> {
 
     public static ByteBufferSchema of() {
         return INSTANCE;
@@ -69,6 +71,18 @@ public class ByteBufferSchema implements Schema<ByteBuffer> {
     }
 
     @Override
+    public ByteBuffer decode(ByteBuf byteBuf) {
+        if (null == byteBuf) {
+            return null;
+        } else {
+            int size = byteBuf.readableBytes();
+            byte[] bytes = new byte[size];
+            byteBuf.readBytes(bytes);
+            return ByteBuffer.wrap(bytes);
+        }
+    }
+
+    @Override
     public SchemaInfo getSchemaInfo() {
         return SCHEMA_INFO;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
index d4635f2..d08660f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
-import org.apache.pulsar.client.api.Schema;
+import io.netty.buffer.ByteBuf;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -26,7 +26,7 @@ import org.apache.pulsar.common.schema.SchemaType;
 /**
  * A schema for 'Byte'.
  */
-public class ByteSchema implements Schema<Byte> {
+public class ByteSchema extends AbstractSchema<Byte> {
 
     public static ByteSchema of() {
         return INSTANCE;
@@ -46,6 +46,13 @@ public class ByteSchema implements Schema<Byte> {
     }
 
     @Override
+    public void validate(ByteBuf message) {
+        if (message.readableBytes() != 1) {
+            throw new SchemaSerializationException("Size of data received by 
ByteSchema is not 1");
+        }
+    }
+
+    @Override
     public byte[] encode(Byte message) {
         if (null == message) {
             return null;
@@ -64,6 +71,15 @@ public class ByteSchema implements Schema<Byte> {
     }
 
     @Override
+    public Byte decode(ByteBuf byteBuf) {
+        if (null == byteBuf) {
+            return null;
+        }
+        validate(byteBuf);
+        return byteBuf.getByte(0);
+    }
+
+    @Override
     public SchemaInfo getSchemaInfo() {
         return SCHEMA_INFO;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
index 9a94fcd..6bf8923 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
@@ -18,14 +18,14 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
-import org.apache.pulsar.client.api.Schema;
+import io.netty.buffer.ByteBuf;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
 /**
  * A schema for bytes array.
  */
-public class BytesSchema implements Schema<byte[]> {
+public class BytesSchema extends AbstractSchema<byte[]> {
 
     public static BytesSchema of() {
         return INSTANCE;
@@ -48,6 +48,18 @@ public class BytesSchema implements Schema<byte[]> {
     }
 
     @Override
+    public byte[] decode(ByteBuf byteBuf) {
+        if (byteBuf == null) {
+            return null;
+        }
+        int size = byteBuf.readableBytes();
+        byte[] bytes = new byte[size];
+
+        byteBuf.readBytes(bytes, 0, size);
+        return bytes;
+    }
+
+    @Override
     public SchemaInfo getSchemaInfo() {
         return SCHEMA_INFO;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
index 819a7d4..7753b44 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
-import org.apache.pulsar.client.api.Schema;
+import io.netty.buffer.ByteBuf;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -27,7 +27,7 @@ import java.util.Date;
 /**
  * A schema for `java.util.Date` or `java.sql.Date`.
  */
-public class DateSchema implements Schema<Date> {
+public class DateSchema extends AbstractSchema<Date> {
    public static DateSchema of() {
       return INSTANCE;
    }
@@ -59,6 +59,16 @@ public class DateSchema implements Schema<Date> {
    }
 
    @Override
+   public Date decode(ByteBuf byteBuf) {
+      if (null == byteBuf) {
+         return null;
+      }
+
+      Long decode = LongSchema.of().decode(byteBuf);
+      return new Date(decode);
+   }
+
+   @Override
    public SchemaInfo getSchemaInfo() {
       return SCHEMA_INFO;
    }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
index e617efb..4ff64b2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
-import org.apache.pulsar.client.api.Schema;
+import io.netty.buffer.ByteBuf;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -26,7 +26,7 @@ import org.apache.pulsar.common.schema.SchemaType;
 /**
  * A schema for `Double`.
  */
-public class DoubleSchema implements Schema<Double> {
+public class DoubleSchema extends AbstractSchema<Double> {
 
     public static DoubleSchema of() {
         return INSTANCE;
@@ -46,6 +46,14 @@ public class DoubleSchema implements Schema<Double> {
     }
 
     @Override
+    public void validate(ByteBuf message) {
+        if (message.readableBytes() != 8) {
+            throw new SchemaSerializationException("Size of data received by 
DoubleSchema is not 8");
+        }
+    }
+
+
+    @Override
     public byte[] encode(Double message) {
         if (null == message) {
             return null;
@@ -79,6 +87,21 @@ public class DoubleSchema implements Schema<Double> {
     }
 
     @Override
+    public Double decode(ByteBuf byteBuf) {
+        if (null == byteBuf) {
+            return null;
+        }
+        validate(byteBuf);
+        long value = 0;
+
+        for (int i = 0; i < 8; i ++) {
+            value <<= 8;
+            value |= byteBuf.getByte(i) & 0xFF;
+        }
+        return Double.longBitsToDouble(value);
+    }
+
+    @Override
     public SchemaInfo getSchemaInfo() {
         return SCHEMA_INFO;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
index 32ac469..7741b38 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
-import org.apache.pulsar.client.api.Schema;
+import io.netty.buffer.ByteBuf;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -26,7 +26,7 @@ import org.apache.pulsar.common.schema.SchemaType;
 /**
  * A schema for `Float`.
  */
-public class FloatSchema implements Schema<Float> {
+public class FloatSchema extends AbstractSchema<Float> {
 
     public static FloatSchema of() {
         return INSTANCE;
@@ -46,6 +46,13 @@ public class FloatSchema implements Schema<Float> {
     }
 
     @Override
+    public void validate(ByteBuf message) {
+        if (message.readableBytes() != 4) {
+            throw new SchemaSerializationException("Size of data received by 
FloatSchema is not 4");
+        }
+    }
+
+    @Override
     public byte[] encode(Float message) {
         if (null == message) {
             return null;
@@ -75,6 +82,21 @@ public class FloatSchema implements Schema<Float> {
     }
 
     @Override
+    public Float decode(ByteBuf byteBuf) {
+        if (null == byteBuf) {
+            return null;
+        }
+        validate(byteBuf);
+        int value = 0;
+        for (int i = 0; i < 4; i++) {
+            value <<= 8;
+            value |= byteBuf.getByte(i) & 0xFF;
+        }
+
+        return Float.intBitsToFloat(value);
+    }
+
+    @Override
     public SchemaInfo getSchemaInfo() {
         return SCHEMA_INFO;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
index 90092a4..63aa0b1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
-import org.apache.pulsar.client.api.Schema;
+import io.netty.buffer.ByteBuf;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -26,7 +26,7 @@ import org.apache.pulsar.common.schema.SchemaType;
 /**
  * A schema for `Integer`.
  */
-public class IntSchema implements Schema<Integer> {
+public class IntSchema extends AbstractSchema<Integer> {
 
     public static IntSchema of() {
         return INSTANCE;
@@ -46,6 +46,13 @@ public class IntSchema implements Schema<Integer> {
     }
 
     @Override
+    public void validate(ByteBuf message) {
+        if (message.readableBytes() != 4) {
+            throw new SchemaSerializationException("Size of data received by 
IntSchema is not 4");
+        }
+    }
+
+    @Override
     public byte[] encode(Integer message) {
         if (null == message) {
             return null;
@@ -74,6 +81,22 @@ public class IntSchema implements Schema<Integer> {
     }
 
     @Override
+    public Integer decode(ByteBuf byteBuf) {
+        if (null == byteBuf) {
+            return null;
+        }
+        validate(byteBuf);
+        int value = 0;
+
+        for (int i = 0; i < 4; i++) {
+            value <<= 8;
+            value |= byteBuf.getByte(i) & 0xFF;
+        }
+
+        return value;
+    }
+
+    @Override
     public SchemaInfo getSchemaInfo() {
         return SCHEMA_INFO;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
index b252279..4b4da71 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
-import org.apache.pulsar.client.api.Schema;
+import io.netty.buffer.ByteBuf;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -26,7 +26,7 @@ import org.apache.pulsar.common.schema.SchemaType;
 /**
  * A schema for `Long`.
  */
-public class LongSchema implements Schema<Long> {
+public class LongSchema extends AbstractSchema<Long> {
 
     public static LongSchema of() {
         return INSTANCE;
@@ -46,6 +46,13 @@ public class LongSchema implements Schema<Long> {
     }
 
     @Override
+    public void validate(ByteBuf message) {
+        if (message.readableBytes() != 8) {
+            throw new SchemaSerializationException("Size of data received by 
LongSchema is not 8");
+        }
+    }
+
+    @Override
     public byte[] encode(Long data) {
         if (null == data) {
             return null;
@@ -78,6 +85,21 @@ public class LongSchema implements Schema<Long> {
     }
 
     @Override
+    public Long decode(ByteBuf byteBuf) {
+        if (null == byteBuf) {
+            return null;
+        }
+        validate(byteBuf);
+        long value = 0L;
+        for (int i = 0; i < 8; i++) {
+            value <<= 8;
+            value |= byteBuf.getByte(i) & 0xFF;
+        }
+
+        return value;
+    }
+
+    @Override
     public SchemaInfo getSchemaInfo() {
         return SCHEMA_INFO;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
index f1ec133..932f2df 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
-import org.apache.pulsar.client.api.Schema;
+import io.netty.buffer.ByteBuf;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -26,7 +26,7 @@ import org.apache.pulsar.common.schema.SchemaType;
 /**
  * A schema for `Short`.
  */
-public class ShortSchema implements Schema<Short> {
+public class ShortSchema extends AbstractSchema<Short> {
 
     public static ShortSchema of() {
         return INSTANCE;
@@ -46,6 +46,13 @@ public class ShortSchema implements Schema<Short> {
     }
 
     @Override
+    public void validate(ByteBuf message) {
+        if (message.readableBytes() != 2) {
+            throw new SchemaSerializationException("Size of data received by 
ShortSchema is not 2");
+        }
+    }
+
+    @Override
     public byte[] encode(Short message) {
         if (null == message) {
             return null;
@@ -72,6 +79,21 @@ public class ShortSchema implements Schema<Short> {
     }
 
     @Override
+    public Short decode(ByteBuf byteBuf) {
+        if (null == byteBuf) {
+            return null;
+        }
+        validate(byteBuf);
+        short value = 0;
+
+        for (int i = 0; i < 2; i++) {
+            value <<= 8;
+            value |= byteBuf.getByte(i) & 0xFF;
+        }
+        return value;
+    }
+
+    @Override
     public SchemaInfo getSchemaInfo() {
         return SCHEMA_INFO;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
index 6859503..3c7959f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
@@ -22,7 +22,9 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.pulsar.client.api.Schema;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -32,7 +34,7 @@ import java.nio.charset.StandardCharsets;
 /**
  * Schema definition for Strings encoded in UTF-8 format.
  */
-public class StringSchema implements Schema<String> {
+public class StringSchema extends AbstractSchema<String> {
 
     static final String CHARSET_KEY = "__charset";
 
@@ -40,6 +42,13 @@ public class StringSchema implements Schema<String> {
         return UTF8;
     }
 
+    private static final FastThreadLocal<byte[]> tmpBuffer = new 
FastThreadLocal<byte[]>() {
+        @Override
+        protected byte[] initialValue() {
+            return new byte[1024];
+        }
+    };
+
     public static StringSchema fromSchemaInfo(SchemaInfo schemaInfo) {
         checkArgument(SchemaType.STRING == schemaInfo.getType(), "Not a string 
schema");
         String charsetName = schemaInfo.getProperties().get(CHARSET_KEY);
@@ -93,6 +102,22 @@ public class StringSchema implements Schema<String> {
         }
     }
 
+    public String decode(ByteBuf byteBuf) {
+        if (null == byteBuf) {
+            return null;
+        } else {
+            int size = byteBuf.readableBytes();
+            byte[] bytes = tmpBuffer.get();
+            if (size > bytes.length) {
+                bytes = new byte[size * 2];
+                tmpBuffer.set(bytes);
+            }
+            byteBuf.readBytes(bytes, 0, size);
+
+            return new String(bytes, 0, size, charset);
+        }
+    }
+
     public SchemaInfo getSchemaInfo() {
         return schemaInfo;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
index a6608f4..b38df41 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
@@ -26,12 +26,13 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
 import org.apache.avro.Schema.Parser;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
 import org.apache.pulsar.client.api.schema.SchemaReader;
@@ -52,7 +53,7 @@ import org.slf4j.LoggerFactory;
  * {@link org.apache.pulsar.common.schema.SchemaType#JSON},
  * and {@link org.apache.pulsar.common.schema.SchemaType#PROTOBUF}.
  */
-public abstract class StructSchema<T> implements Schema<T> {
+public abstract class StructSchema<T> extends AbstractSchema<T> {
 
     protected static final Logger LOG = 
LoggerFactory.getLogger(StructSchema.class);
 
@@ -61,6 +62,7 @@ public abstract class StructSchema<T> implements Schema<T> {
     protected SchemaReader<T> reader;
     protected SchemaWriter<T> writer;
     protected SchemaInfoProvider schemaInfoProvider;
+
     private final LoadingCache<BytesSchemaVersion, SchemaReader<T>> 
readerCache = CacheBuilder.newBuilder().maximumSize(100000)
             .expireAfterAccess(30, TimeUnit.MINUTES).build(new 
CacheLoader<BytesSchemaVersion, SchemaReader<T>>() {
                 @Override
@@ -100,6 +102,22 @@ public abstract class StructSchema<T> implements Schema<T> 
{
     }
 
     @Override
+    public T decode(ByteBuf byteBuf) {
+        return reader.read(new ByteBufInputStream(byteBuf));
+    }
+
+    @Override
+    public T decode(ByteBuf byteBuf, byte[] schemaVersion) {
+        try {
+            return 
readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(new 
ByteBufInputStream(byteBuf));
+        } catch (ExecutionException e) {
+            LOG.error("Can't get generic schema for topic {} schema version 
{}",
+                    schemaInfoProvider.getTopicName(), 
Hex.encodeHexString(schemaVersion), e);
+            throw new RuntimeException("Can't get generic schema for topic " + 
schemaInfoProvider.getTopicName());
+        }
+    }
+
+    @Override
     public SchemaInfo getSchemaInfo() {
         return this.schemaInfo;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
index 212b555..66aa4f9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
-import org.apache.pulsar.client.api.Schema;
+import io.netty.buffer.ByteBuf;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -27,7 +27,7 @@ import java.sql.Time;
 /**
  * A schema for `java.sql.Time`.
  */
-public class TimeSchema implements Schema<Time> {
+public class TimeSchema extends AbstractSchema<Time> {
    public static TimeSchema of() {
       return INSTANCE;
    }
@@ -59,6 +59,16 @@ public class TimeSchema implements Schema<Time> {
    }
 
    @Override
+   public Time decode(ByteBuf byteBuf) {
+      if (null == byteBuf) {
+         return null;
+      }
+
+      Long decode = LongSchema.of().decode(byteBuf);
+      return new Time(decode);
+   }
+
+   @Override
    public SchemaInfo getSchemaInfo() {
       return SCHEMA_INFO;
    }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
index de8646f..9bfbaba 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
-import org.apache.pulsar.client.api.Schema;
+import io.netty.buffer.ByteBuf;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -27,7 +27,7 @@ import java.sql.Timestamp;
 /**
  * A schema for `java.sql.Timestamp`.
  */
-public class TimestampSchema implements Schema<Timestamp> {
+public class TimestampSchema extends AbstractSchema<Timestamp> {
    public static TimestampSchema of() {
       return INSTANCE;
    }
@@ -59,6 +59,16 @@ public class TimestampSchema implements Schema<Timestamp> {
    }
 
    @Override
+   public Timestamp decode(ByteBuf byteBuf) {
+      if (null == byteBuf) {
+         return null;
+      }
+
+      Long decode = LongSchema.of().decode(byteBuf);
+      return new Timestamp(decode);
+   }
+
+   @Override
    public SchemaInfo getSchemaInfo() {
       return SCHEMA_INFO;
    }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
index bc4f65e..a276cb9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
@@ -29,8 +29,12 @@ import org.apache.pulsar.client.api.schema.Field;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaReader;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -64,9 +68,9 @@ public class GenericAvroReader implements 
SchemaReader<GenericRecord> {
     }
 
     @Override
-    public GenericAvroRecord read(byte[] bytes) {
+    public GenericAvroRecord read(byte[] bytes, int offset, int length) {
         try {
-            Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
+            Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, 
offset, length, null);
             org.apache.avro.generic.GenericRecord avroRecord =
                     (org.apache.avro.generic.GenericRecord)reader.read(
                     null,
@@ -76,4 +80,26 @@ public class GenericAvroReader implements 
SchemaReader<GenericRecord> {
             throw new SchemaSerializationException(e);
         }
     }
+
+    @Override
+    public GenericRecord read(InputStream inputStream) {
+        try {
+            Decoder decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+            org.apache.avro.generic.GenericRecord avroRecord =
+                    (org.apache.avro.generic.GenericRecord)reader.read(
+                            null,
+                            decoder);
+            return new GenericAvroRecord(schemaVersion, schema, fields, 
avroRecord);
+        } catch (IOException e) {
+            throw new SchemaSerializationException(e);
+        } finally {
+            try {
+                inputStream.close();
+            } catch (IOException e) {
+                log.error("GenericAvroReader close inputStream close error", 
e.getMessage());
+            }
+        }
+    }
+
+    private static final Logger log = 
LoggerFactory.getLogger(GenericAvroReader.class);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
index 1d2fced..9eeb15f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
@@ -24,9 +24,12 @@ import 
org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.Field;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaReader;
-import org.apache.pulsar.common.schema.SchemaInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.List;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -48,12 +51,30 @@ public class GenericJsonReader implements 
SchemaReader<GenericRecord> {
         this.schemaVersion = schemaVersion;
     }
     @Override
-    public GenericJsonRecord read(byte[] bytes) {
+    public GenericJsonRecord read(byte[] bytes, int offset, int length) {
+        try {
+            JsonNode jn = objectMapper.readTree(new String(bytes, offset, 
length, UTF_8));
+            return new GenericJsonRecord(schemaVersion, fields, jn);
+        } catch (IOException ioe) {
+            throw new SchemaSerializationException(ioe);
+        }
+    }
+
+    @Override
+    public GenericRecord read(InputStream inputStream) {
         try {
-            JsonNode jn = objectMapper.readTree(new String(bytes, UTF_8));
+            JsonNode jn = objectMapper.readTree(inputStream);
             return new GenericJsonRecord(schemaVersion, fields, jn);
         } catch (IOException ioe) {
             throw new SchemaSerializationException(ioe);
+        } finally {
+            try {
+                inputStream.close();
+            } catch (IOException e) {
+                log.error("GenericJsonReader close inputStream close error", 
e.getMessage());
+            }
         }
     }
+
+    private static final Logger log = 
LoggerFactory.getLogger(GenericJsonReader.class);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java
index c502df5..dc5bfe6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java
@@ -25,7 +25,11 @@ import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.SchemaReader;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
+import java.io.InputStream;
 
 public class AvroReader<T> implements SchemaReader<T> {
 
@@ -42,17 +46,39 @@ public class AvroReader<T> implements SchemaReader<T> {
     }
 
     @Override
-    public T read(byte[] bytes) {
+    public T read(byte[] bytes, int offset, int length) {
+        try {
+            BinaryDecoder decoderFromCache = decoders.get();
+            BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, 
offset, length, decoderFromCache);
+            if (decoderFromCache == null) {
+                decoders.set(decoder);
+            }
+            return reader.read(null, DecoderFactory.get().binaryDecoder(bytes, 
offset, length, decoder));
+        } catch (IOException e) {
+            throw new SchemaSerializationException(e);
+        }
+    }
+
+    @Override
+    public T read(InputStream inputStream) {
         try {
             BinaryDecoder decoderFromCache = decoders.get();
-            BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, 
decoderFromCache);
+            BinaryDecoder decoder = 
DecoderFactory.get().binaryDecoder(inputStream, decoderFromCache);
             if (decoderFromCache == null) {
                 decoders.set(decoder);
             }
-            return reader.read(null, DecoderFactory.get().binaryDecoder(bytes, 
decoder));
+            return reader.read(null, 
DecoderFactory.get().binaryDecoder(inputStream, decoder));
         } catch (IOException e) {
             throw new SchemaSerializationException(e);
+        } finally {
+            try {
+                inputStream.close();
+            } catch (IOException e) {
+                log.error("AvroReader close inputStream close error", 
e.getMessage());
+            }
         }
     }
 
+    private static final Logger log = 
LoggerFactory.getLogger(AvroReader.class);
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/JsonReader.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/JsonReader.java
index 05ef7cb..7867ddd 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/JsonReader.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/JsonReader.java
@@ -21,8 +21,11 @@ package org.apache.pulsar.client.impl.schema.reader;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.SchemaReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.InputStream;
 
 public class JsonReader<T> implements SchemaReader<T> {
     private final Class<T> pojo;
@@ -34,11 +37,28 @@ public class JsonReader<T> implements SchemaReader<T> {
     }
 
     @Override
-    public T read(byte[] bytes) {
+    public T read(byte[] bytes, int offset, int length) {
         try {
-            return objectMapper.readValue(bytes, this.pojo);
+            return objectMapper.readValue(bytes, offset, length, this.pojo);
         } catch (IOException e) {
             throw new SchemaSerializationException(e);
         }
     }
+
+    @Override
+    public T read(InputStream inputStream) {
+        try {
+            return objectMapper.readValue(inputStream, pojo);
+        } catch (IOException e) {
+            throw new SchemaSerializationException(e);
+        } finally {
+            try {
+                inputStream.close();
+            } catch (IOException e) {
+                log.error("JsonReader close inputStream close error", 
e.getMessage());
+            }
+        }
+    }
+
+    private static final Logger log = 
LoggerFactory.getLogger(JsonReader.class);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java
index bca952e..01c0c12 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java
@@ -23,6 +23,12 @@ import com.google.protobuf.Parser;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.SchemaReader;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+
 public class ProtobufReader<T extends com.google.protobuf.GeneratedMessageV3> 
implements SchemaReader<T> {
     private Parser<T> tParser;
 
@@ -31,12 +37,28 @@ public class ProtobufReader<T extends 
com.google.protobuf.GeneratedMessageV3> im
     }
 
     @Override
-    public T read(byte[] bytes) {
+    public T read(byte[] bytes, int offset, int length) {
+        try {
+            return this.tParser.parseFrom(bytes, offset, length);
+        } catch (InvalidProtocolBufferException e) {
+            throw new SchemaSerializationException(e);
+        }
+    }
+
+    @Override
+    public T read(InputStream inputStream) {
         try {
-            return this.tParser.parseFrom(bytes);
+            return this.tParser.parseFrom(inputStream);
         } catch (InvalidProtocolBufferException e) {
             throw new SchemaSerializationException(e);
+        } finally {
+            try {
+                inputStream.close();
+            } catch (IOException e) {
+                log.error("ProtobufReader close inputStream close error", 
e.getMessage());
+            }
         }
     }
 
+    private static final Logger log = 
LoggerFactory.getLogger(ProtobufReader.class);
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
index 6e31dbd..62acef4 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
@@ -30,6 +30,8 @@ import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.Date;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 
@@ -302,4 +304,28 @@ public class AvroSchemaTest {
     assertEquals(object, nasaMission);
   }
 
+    @Test
+    public void testDecodeByteBuf() {
+        AvroSchema<Foo> avroSchema = 
AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+
+        Foo foo1 = new Foo();
+        foo1.setField1("foo1");
+        foo1.setField2("bar1");
+        foo1.setField4(new Bar());
+        foo1.setFieldUnableNull("notNull");
+
+        Foo foo2 = new Foo();
+        foo2.setField1("foo2");
+        foo2.setField2("bar2");
+
+        byte[] bytes1 = avroSchema.encode(foo1);
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(bytes1.length);
+        byteBuf.writeBytes(bytes1);
+
+        Foo object1 = avroSchema.decode(byteBuf);
+        Assert.assertTrue(bytes1.length > 0);
+        assertEquals(object1, foo1);
+
+    }
+
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/BooleanSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/BooleanSchemaTest.java
index 9f62d1a..fdcf471 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/BooleanSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/BooleanSchemaTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -54,12 +56,23 @@ public class BooleanSchemaTest {
         BooleanSchema schema = BooleanSchema.of();
         Assert.assertEquals(new Boolean(true), schema.decode(trueBytes));
         Assert.assertEquals(new Boolean(false), schema.decode(falseBytes));
+
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(1);
+        byteBuf.writeBytes(trueBytes);
+        Assert.assertEquals(new Boolean(true), schema.decode(byteBuf));
+        byteBuf.writerIndex(0);
+        byteBuf.writeBytes(falseBytes);
+
+        Assert.assertEquals(new Boolean(false), schema.decode(byteBuf));
     }
 
     @Test
     public void testNullEncodeDecode() {
+        ByteBuf byteBuf = null;
+        byte[] bytes = null;
         Assert.assertNull(BooleanSchema.of().encode(null));
-        Assert.assertNull(BooleanSchema.of().decode(null));
+        Assert.assertNull(BooleanSchema.of().decode(byteBuf));
+        Assert.assertNull(BooleanSchema.of().decode(bytes));
     }
 
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/BytesSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/BytesSchemaTest.java
index 9a2b6ca..48bd5bc 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/BytesSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/BytesSchemaTest.java
@@ -19,8 +19,11 @@
 package org.apache.pulsar.client.impl.schema;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertSame;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import org.apache.pulsar.client.api.Schema;
 import org.testng.annotations.Test;
 
@@ -47,6 +50,10 @@ public class BytesSchemaTest {
 
         byte[] deserializedData = schema.decode(serializedData);
         assertSame(data, deserializedData);
+        ByteBuf byteBuf = 
ByteBufAllocator.DEFAULT.buffer(deserializedData.length);
+        byteBuf.writeBytes(deserializedData);
+        assertEquals(data, ((BytesSchema)schema).decode(byteBuf));
+
     }
 
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DateSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DateSchemaTest.java
index 0e707ed..69499d8 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DateSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DateSchemaTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -46,7 +48,11 @@ public class DateSchemaTest {
     public void testSchemaEncodeDecodeFidelity() {
         DateSchema schema = DateSchema.of();
         Date date = new Date();
-        Assert.assertEquals(date, schema.decode(schema.encode(date)));
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8);
+        byte[] bytes = schema.encode(date);
+        byteBuf.writeBytes(bytes);
+        Assert.assertEquals(date, schema.decode(bytes));
+        Assert.assertEquals(date, schema.decode(byteBuf));
     }
 
     @Test
@@ -63,13 +69,20 @@ public class DateSchemaTest {
         };
         long expected = 10*65536 + 24*256 + 42;
         DateSchema schema = DateSchema.of();
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8);
+        byteBuf.writeBytes(byteData);
         Assert.assertEquals(expected, schema.decode(byteData).getTime());
+        Assert.assertEquals(expected, schema.decode(byteBuf).getTime());
     }
 
     @Test
     public void testNullEncodeDecode() {
+        ByteBuf byteBuf = null;
+        byte[] bytes = null;
+
         Assert.assertNull(DateSchema.of().encode(null));
-        Assert.assertNull(DateSchema.of().decode(null));
+        Assert.assertNull(DateSchema.of().decode(byteBuf));
+        Assert.assertNull(DateSchema.of().decode(bytes));
     }
 
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DoubleSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DoubleSchemaTest.java
index f5a1ab3..6fc76de 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DoubleSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DoubleSchemaTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -45,13 +47,20 @@ public class DoubleSchemaTest {
     public void testSchemaEncodeDecodeFidelity() {
         DoubleSchema schema = DoubleSchema.of();
         Double dbl = new Double(1234578.8754321);
-        Assert.assertEquals(dbl, schema.decode(schema.encode(dbl)));
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8);
+        byte[] bytes = schema.encode(dbl);
+        byteBuf.writeBytes(bytes);
+        Assert.assertEquals(dbl, schema.decode(bytes));
+        Assert.assertEquals(dbl, schema.decode(byteBuf));
     }
 
     @Test
     public void testNullEncodeDecode() {
+        ByteBuf byteBuf = null;
+        byte[] bytes = null;
         Assert.assertNull(DoubleSchema.of().encode(null));
-        Assert.assertNull(DoubleSchema.of().decode(null));
+        Assert.assertNull(DoubleSchema.of().decode(byteBuf));
+        Assert.assertNull(DoubleSchema.of().decode(bytes));
     }
 
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/FloatSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/FloatSchemaTest.java
index b915582..5d5dd0b 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/FloatSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/FloatSchemaTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -40,14 +42,23 @@ public class FloatSchemaTest {
     @Test
     public void testSchemaEncodeDecodeFidelity() {
         FloatSchema schema = FloatSchema.of();
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(4);
         Float dbl = new Float(1234578.8754321);
-        Assert.assertEquals(dbl, schema.decode(schema.encode(dbl)));
+        byte[] bytes = schema.encode(dbl);
+        byteBuf.writeBytes(schema.encode(dbl));
+        Assert.assertEquals(dbl, schema.decode(bytes));
+        Assert.assertEquals(dbl, schema.decode(byteBuf));
+
     }
 
     @Test
     public void testNullEncodeDecode() {
+        ByteBuf byteBuf = null;
+        byte[] bytes = null;
         Assert.assertNull(FloatSchema.of().encode(null));
-        Assert.assertNull(FloatSchema.of().decode(null));
+        Assert.assertNull(FloatSchema.of().decode(bytes));
+        Assert.assertNull(FloatSchema.of().decode(byteBuf));
     }
-
 }
+
+
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/IntSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/IntSchemaTest.java
index fb17811..98f00ec 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/IntSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/IntSchemaTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -40,10 +42,15 @@ public class IntSchemaTest {
     public void testSchemaEncodeDecodeFidelity() {
         IntSchema schema = IntSchema.of();
         int start = 348592040;
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(4);
         for (int i = 0; i < 100; ++i) {
             byte[] encode = schema.encode(start + i);
+            byteBuf.writerIndex(0);
+            byteBuf.writeBytes(encode);
             int decoded = schema.decode(encode);
             Assert.assertEquals(decoded, start + i);
+            decoded = schema.decode(byteBuf);
+            Assert.assertEquals(decoded, start + i);
         }
     }
 
@@ -55,15 +62,21 @@ public class IntSchemaTest {
                24,
                42
         };
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(4);
         Integer expected = 10*65536 + 24*256 + 42;
         IntSchema schema = IntSchema.of();
+        byteBuf.writeBytes(byteData);
         Assert.assertEquals(expected, schema.decode(byteData));
+        Assert.assertEquals(expected, schema.decode(byteBuf));
     }
 
     @Test
     public void testNullEncodeDecode() {
+        ByteBuf byteBuf = null;
+        byte[] bytes = null;
         Assert.assertNull(IntSchema.of().encode(null));
-        Assert.assertNull(IntSchema.of().decode(null));
+        Assert.assertNull(IntSchema.of().decode(bytes));
+        Assert.assertNull(IntSchema.of().decode(byteBuf));
     }
 
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java
index 5efb82b..f280113 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.client.impl.schema;
 import java.util.Collections;
 import java.util.List;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
@@ -302,4 +304,26 @@ public class JSONSchemaTest {
         JSONSchema<Foo> jsonSchema = 
JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
         jsonSchema.decode(new byte[0]);
     }
+
+    @Test
+    public void testDecodeByteBuf() {
+        JSONSchema<Foo> jsonSchema = 
JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
+
+        Foo foo1 = new Foo();
+        foo1.setField1("foo1");
+        foo1.setField2("bar1");
+        foo1.setField4(new Bar());
+        foo1.setFieldUnableNull("notNull");
+
+        Foo foo2 = new Foo();
+        foo2.setField1("foo2");
+        foo2.setField2("bar2");
+
+        byte[] bytes1 = jsonSchema.encode(foo1);
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(bytes1.length);
+        byteBuf.writeBytes(bytes1);
+        Assert.assertTrue(bytes1.length > 0);
+        assertEquals(jsonSchema.decode(byteBuf), foo1);
+
+    }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LongSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LongSchemaTest.java
index 84c217c..397f5ac 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LongSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LongSchemaTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -43,11 +45,17 @@ public class LongSchemaTest {
     @Test
     public void testSchemaEncodeDecodeFidelity() {
         LongSchema longSchema = LongSchema.of();
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8);
         long start = 348592040;
         for (int i = 0; i < 100; ++i) {
             byte[] encode = longSchema.encode(start + i);
             long decoded = longSchema.decode(encode);
             Assert.assertEquals(decoded, start + i);
+            byteBuf.writerIndex(0);
+            byteBuf.writeBytes(encode);
+
+            decoded = longSchema.decode(byteBuf);
+            Assert.assertEquals(decoded, start + i);
         }
     }
 
@@ -65,13 +73,20 @@ public class LongSchemaTest {
         };
         Long expected = 10*65536l + 24*256 + 42;
         LongSchema longSchema = LongSchema.of();
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8);
+        byteBuf.writeBytes(byteData);
+
         Assert.assertEquals(expected, longSchema.decode(byteData));
+        Assert.assertEquals(expected, longSchema.decode(byteBuf));
     }
 
     @Test
     public void testNullEncodeDecode() {
+        ByteBuf byteBuf = null;
+        byte[] bytes = null;
         Assert.assertNull(LongSchema.of().encode(null));
-        Assert.assertNull(LongSchema.of().decode(null));
+        Assert.assertNull(LongSchema.of().decode(byteBuf));
+        Assert.assertNull(LongSchema.of().decode(bytes));
     }
 
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java
index 31d1c14..264bf4c 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java
@@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.nio.ByteBuffer;
 import java.sql.Date;
@@ -88,10 +89,14 @@ public class PrimitiveSchemaTest {
     @Test(dataProvider = "schemas")
     public void allSchemasShouldSupportNull(Map<Schema, List<Object>> 
testData) {
         for (Schema<?> schema : testData.keySet()) {
+            byte[] bytes = null;
+            ByteBuf byteBuf =  null;
             assertNull(schema.encode(null),
                 "Should support null in " + schema.getSchemaInfo().getName() + 
" serialization");
-            assertNull(schema.decode( null),
+            assertNull(schema.decode(bytes),
                 "Should support null in " + schema.getSchemaInfo().getName() + 
" deserialization");
+            assertNull(((AbstractSchema)schema).decode(byteBuf),
+                    "Should support null in " + 
schema.getSchemaInfo().getName() + " deserialization");
         }
     }
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
index 57ccd56..fc03412 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.client.impl.schema;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -111,4 +113,18 @@ public class ProtobufSchemaTest {
         Assert.assertEquals(new 
ObjectMapper().writeValueAsString(protobufSchema.getSchemaInfo().getProperties()),
 EXPECTED_PARSING_INFO);
 
     }
+
+    @Test
+    public void testDecodeByteBuf() throws JsonProcessingException {
+        ProtobufSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> 
protobufSchema
+                = 
ProtobufSchema.of(org.apache.pulsar.client.schema.proto.Test.TestMessage.class);
+        org.apache.pulsar.client.schema.proto.Test.TestMessage testMessage =
+                
org.apache.pulsar.client.schema.proto.Test.TestMessage.newBuilder().build();
+        byte[] bytes = 
protobufSchema.encode(org.apache.pulsar.client.schema.proto.Test.TestMessage.newBuilder().build());
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(bytes.length);
+        byteBuf.writeBytes(bytes);
+
+        Assert.assertEquals(testMessage, protobufSchema.decode(byteBuf));
+
+    }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ShortSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ShortSchemaTest.java
index 1252dc9..54fd35b 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ShortSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ShortSchemaTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -37,11 +39,16 @@ public class ShortSchemaTest {
     @Test
     public void testSchemaEncodeDecodeFidelity() {
         ShortSchema schema = ShortSchema.of();
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(2);
         short start = 3440;
         for (short i = 0; i < 100; ++i) {
             byte[] encode = schema.encode((short)(start + i));
+            byteBuf.writerIndex(0);
+            byteBuf.writeBytes(encode);
             int decoded = schema.decode(encode);
             Assert.assertEquals(decoded, start + i);
+            decoded = schema.decode(byteBuf);
+            Assert.assertEquals(decoded, start + i);
         }
     }
 
@@ -53,13 +60,19 @@ public class ShortSchemaTest {
         };
         Short expected = 24*256 + 42;
         ShortSchema schema = ShortSchema.of();
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(2);
+        byteBuf.writeBytes(byteData);
         Assert.assertEquals(expected, schema.decode(byteData));
+        Assert.assertEquals(expected, schema.decode(byteBuf));
     }
 
     @Test
     public void testNullEncodeDecode() {
+        ByteBuf byteBuf = null;
+        byte[] bytes = null;
         Assert.assertNull(ShortSchema.of().encode(null));
-        Assert.assertNull(ShortSchema.of().decode(null));
+        Assert.assertNull(ShortSchema.of().decode(byteBuf));
+        Assert.assertNull(ShortSchema.of().decode(bytes));
     }
 
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java
index 6aa9dd7..b09bf4d 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java
@@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.US_ASCII;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
 
@@ -29,6 +30,9 @@ import java.nio.charset.Charset;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.annotations.DataProvider;
@@ -51,6 +55,11 @@ public class StringSchemaTest {
 
         String decodedString = schema.decode(data);
         assertEquals(decodedString, myString);
+
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(data.length);
+        byteBuf.writeBytes(data);
+
+        assertEquals(schema.decode(byteBuf), myString);
     }
 
     @Test
@@ -69,6 +78,11 @@ public class StringSchemaTest {
 
         String decodedString = schema.decode(data);
         assertEquals(decodedString, myString);
+
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(data.length);
+        byteBuf.writeBytes(data);
+
+        assertEquals(schema.decode(byteBuf), myString);
     }
 
     @Test
@@ -86,6 +100,10 @@ public class StringSchemaTest {
 
         String decodedString = schema.decode(data);
         assertEquals(decodedString, myString);
+
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(data.length);
+        byteBuf.writeBytes(data);
+        assertEquals(schema.decode(byteBuf), myString);
     }
 
     @DataProvider(name = "charsets")
@@ -117,6 +135,11 @@ public class StringSchemaTest {
 
         String decodedString = schema.decode(data);
         assertEquals(decodedString, myString);
+
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(data.length);
+        byteBuf.writeBytes(data);
+
+        assertEquals(schema.decode(byteBuf), myString);
     }
 
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/TimeSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/TimeSchemaTest.java
index 7b5cd56..fab20e7 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/TimeSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/TimeSchemaTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -61,15 +63,21 @@ public class TimeSchemaTest {
                24,
                42
         };
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(byteData.length);
+        byteBuf.writeBytes(byteData);
         long expected = 10*65536 + 24*256 + 42;
         TimeSchema schema = TimeSchema.of();
         Assert.assertEquals(expected, schema.decode(byteData).getTime());
+        Assert.assertEquals(expected, schema.decode(byteBuf).getTime());
     }
 
     @Test
     public void testNullEncodeDecode() {
+        ByteBuf byteBuf = null;
+        byte[] bytes = null;
         Assert.assertNull(TimeSchema.of().encode(null));
-        Assert.assertNull(TimeSchema.of().decode(null));
+        Assert.assertNull(TimeSchema.of().decode(bytes));
+        Assert.assertNull(TimeSchema.of().decode(byteBuf));
     }
 
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/TimestampSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/TimestampSchemaTest.java
index 1d864b6..022ba38 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/TimestampSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/TimestampSchemaTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -61,15 +63,23 @@ public class TimestampSchemaTest {
                24,
                42
         };
+
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(byteData.length);
+        byteBuf.writeBytes(byteData);
         long expected = 10*65536 + 24*256 + 42;
         TimestampSchema schema = TimestampSchema.of();
         Assert.assertEquals(expected, schema.decode(byteData).getTime());
+        Assert.assertEquals(expected, schema.decode(byteBuf).getTime());
+
     }
 
     @Test
     public void testNullEncodeDecode() {
+        ByteBuf byteBuf = null;
+        byte[] bytes = null;
         Assert.assertNull(TimestampSchema.of().encode(null));
-        Assert.assertNull(TimestampSchema.of().decode(null));
+        Assert.assertNull(TimestampSchema.of().decode(byteBuf));
+        Assert.assertNull(TimestampSchema.of().decode(bytes));
     }
 
 }

Reply via email to