sijie closed pull request #2500: [schema] add schemas for primtive types
URL: https://github.com/apache/incubator-pulsar/pull/2500
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java
index 39248d2719..46a38c100c 100644
--- 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java
@@ -19,6 +19,11 @@
 package org.apache.pulsar.client.api;
 
 public class SchemaSerializationException extends RuntimeException {
+
+    public SchemaSerializationException(String message) {
+        super(message);
+    }
+
     public SchemaSerializationException(Throwable cause) {
         super(cause);
     }
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
new file mode 100644
index 0000000000..ee8ba66fa6
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import java.nio.ByteBuffer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * A bytebuffer schema.
+ */
+public class ByteBufferSchema implements Schema<ByteBuffer> {
+
+    public static ByteBufferSchema of() {
+        return INSTANCE;
+    }
+
+    private static final ByteBufferSchema INSTANCE = new ByteBufferSchema();
+    private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+        .setName("ByteBuffer")
+        .setType(SchemaType.BYTEBUFFER)
+        .setSchema(new byte[0]);
+
+    @Override
+    public byte[] encode(ByteBuffer data) {
+        if (data == null) {
+            return null;
+        }
+
+        data.rewind();
+
+        if (data.hasArray()) {
+            byte[] arr = data.array();
+            if (data.arrayOffset() == 0 && arr.length == data.remaining()) {
+                return arr;
+            }
+        }
+
+        byte[] ret = new byte[data.remaining()];
+        data.get(ret, 0, ret.length);
+        data.rewind();
+        return ret;
+    }
+
+    @Override
+    public ByteBuffer decode(byte[] data) {
+        if (null == data) {
+            return null;
+        } else {
+            return ByteBuffer.wrap(data);
+        }
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return SCHEMA_INFO;
+    }
+}
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
new file mode 100644
index 0000000000..da82216df8
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * A schema for 'Byte'.
+ */
+public class ByteSchema implements Schema<Byte> {
+
+    public static ByteSchema of() {
+        return INSTANCE;
+    }
+
+    private static final ByteSchema INSTANCE = new ByteSchema();
+    private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+        .setName("INT8")
+        .setType(SchemaType.INT8)
+        .setSchema(new byte[0]);
+
+
+    @Override
+    public byte[] encode(Byte message) {
+        if (null == message) {
+            return null;
+        } else {
+            return new byte[]{message};
+        }
+    }
+
+    @Override
+    public Byte decode(byte[] bytes) {
+        if (null == bytes) {
+            return null;
+        }
+        if (bytes.length != 1) {
+            throw new SchemaSerializationException("Size of data received by 
ByteSchema is not 1");
+        }
+        return bytes[0];
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return SCHEMA_INFO;
+    }
+}
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
index 042f04c1bb..9a94fcd9b4 100644
--- 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
@@ -20,8 +20,23 @@
 
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 
+/**
+ * A schema for bytes array.
+ */
 public class BytesSchema implements Schema<byte[]> {
+
+    public static BytesSchema of() {
+        return INSTANCE;
+    }
+
+    private static final BytesSchema INSTANCE = new BytesSchema();
+    private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+        .setName("Bytes")
+        .setType(SchemaType.BYTES)
+        .setSchema(new byte[0]);
+
     @Override
     public byte[] encode(byte[] message) {
         return message;
@@ -34,6 +49,6 @@
 
     @Override
     public SchemaInfo getSchemaInfo() {
-        return null;
+        return SCHEMA_INFO;
     }
 }
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
new file mode 100644
index 0000000000..8ffd9d3ac5
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * A schema for `Double`.
+ */
+public class DoubleSchema implements Schema<Double> {
+
+    public static DoubleSchema of() {
+        return INSTANCE;
+    }
+
+    private static final DoubleSchema INSTANCE = new DoubleSchema();
+    private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+        .setName("Double")
+        .setType(SchemaType.DOUBLE)
+        .setSchema(new byte[0]);
+
+    @Override
+    public byte[] encode(Double message) {
+        if (null == message) {
+            return null;
+        } else {
+            long bits = Double.doubleToLongBits(message);
+            return new byte[] {
+                (byte) (bits >>> 56),
+                (byte) (bits >>> 48),
+                (byte) (bits >>> 40),
+                (byte) (bits >>> 32),
+                (byte) (bits >>> 24),
+                (byte) (bits >>> 16),
+                (byte) (bits >>> 8),
+                (byte) bits
+            };
+        }
+    }
+
+    @Override
+    public Double decode(byte[] bytes) {
+        if (null == bytes) {
+            return null;
+        }
+        if (bytes.length != 8) {
+            throw new SchemaSerializationException("Size of data received by 
DoubleSchema is not 8");
+        }
+        long value = 0;
+        for (byte b : bytes) {
+            value <<= 8;
+            value |= b & 0xFF;
+        }
+        return Double.longBitsToDouble(value);
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return SCHEMA_INFO;
+    }
+}
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
new file mode 100644
index 0000000000..b7c61fb9c1
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * A schema for `Float`.
+ */
+public class FloatSchema implements Schema<Float> {
+
+    public static FloatSchema of() {
+        return INSTANCE;
+    }
+
+    private static final FloatSchema INSTANCE = new FloatSchema();
+    private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+        .setName("Float")
+        .setType(SchemaType.FLOAT)
+        .setSchema(new byte[0]);
+
+    @Override
+    public byte[] encode(Float message) {
+        if (null == message) {
+            return null;
+        } else {
+            long bits = Float.floatToRawIntBits(message);
+            return new byte[] {
+                (byte) (bits >>> 24),
+                (byte) (bits >>> 16),
+                (byte) (bits >>> 8),
+                (byte) bits
+            };
+        }
+    }
+
+    @Override
+    public Float decode(byte[] bytes) {
+        if (null == bytes) {
+            return null;
+        }
+        if (bytes.length != 4) {
+            throw new SchemaSerializationException("Size of data received by 
FloatSchema is not 4");
+        }
+        int value = 0;
+        for (byte b : bytes) {
+            value <<= 8;
+            value |= b & 0xFF;
+        }
+        return Float.intBitsToFloat(value);
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return SCHEMA_INFO;
+    }
+}
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
new file mode 100644
index 0000000000..33bd73b65b
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * A schema for `Integer`.
+ */
+public class IntSchema implements Schema<Integer> {
+
+    public static IntSchema of() {
+        return INSTANCE;
+    }
+
+    private static final IntSchema INSTANCE = new IntSchema();
+    private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+        .setName("INT32")
+        .setType(SchemaType.INT32)
+        .setSchema(new byte[0]);
+
+    @Override
+    public byte[] encode(Integer message) {
+        if (null == message) {
+            return null;
+        } else {
+            return new byte[] {
+                (byte) (message >>> 24),
+                (byte) (message >>> 16),
+                (byte) (message >>> 8),
+                message.byteValue()
+            };
+        }
+    }
+
+    @Override
+    public Integer decode(byte[] bytes) {
+        if (null == bytes) {
+            return null;
+        }
+        if (bytes.length != 4) {
+            throw new SchemaSerializationException("Size of data received by 
IntSchema is not 4");
+        }
+        int value = 0;
+        for (byte b : bytes) {
+            value <<= 8;
+            value |= b & 0xFF;
+        }
+        return value;
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return SCHEMA_INFO;
+    }
+}
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
new file mode 100644
index 0000000000..e82a901bb7
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * A schema for `Long`.
+ */
+public class LongSchema implements Schema<Long> {
+
+    public static LongSchema of() {
+        return INSTANCE;
+    }
+
+    private static final LongSchema INSTANCE = new LongSchema();
+    private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+        .setName("INT64")
+        .setType(SchemaType.INT64)
+        .setSchema(new byte[0]);
+
+    @Override
+    public byte[] encode(Long data) {
+        if (null == data) {
+            return null;
+        } else {
+            return new byte[] {
+                (byte) (data >>> 56),
+                (byte) (data >>> 48),
+                (byte) (data >>> 40),
+                (byte) (data >>> 32),
+                (byte) (data >>> 24),
+                (byte) (data >>> 16),
+                (byte) (data >>> 8),
+                data.byteValue()
+            };
+        }
+    }
+
+    @Override
+    public Long decode(byte[] bytes) {
+        if (null == bytes) {
+            return null;
+        }
+        if (bytes.length != 8) {
+            throw new SchemaSerializationException("Size of data received by 
LongSchema is not 8");
+        }
+        long value = 0L;
+        for (byte b : bytes) {
+            value <<= 8;
+            value |= b & 0xFF;
+        }
+        return value;
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return SCHEMA_INFO;
+    }
+}
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
new file mode 100644
index 0000000000..fc73b89470
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * A schema for `Short`.
+ */
+public class ShortSchema implements Schema<Short> {
+
+    public static ShortSchema of() {
+        return INSTANCE;
+    }
+
+    private static final ShortSchema INSTANCE = new ShortSchema();
+    private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+        .setName("INT16")
+        .setType(SchemaType.INT16)
+        .setSchema(new byte[0]);
+
+    @Override
+    public byte[] encode(Short message) {
+        if (null == message) {
+            return null;
+        } else {
+            return new byte[] {
+                (byte) (message >>> 8),
+                message.byteValue()
+            };
+        }
+    }
+
+    @Override
+    public Short decode(byte[] bytes) {
+        if (null == bytes) {
+            return null;
+        }
+        if (bytes.length != 2) {
+            throw new SchemaSerializationException("Size of data received by 
ShortSchema is not 2");
+        }
+        short value = 0;
+        for (byte b : bytes) {
+            value <<= 8;
+            value |= b & 0xFF;
+        }
+        return value;
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return SCHEMA_INFO;
+    }
+}
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
index 4eca216088..11b5c5f79e 100644
--- 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
@@ -29,6 +29,17 @@
  * Schema definition for Strings encoded in UTF-8 format.
  */
 public class StringSchema implements Schema<String> {
+
+    public static StringSchema utf8() {
+        return UTF8;
+    }
+
+    private static final StringSchema UTF8 = new 
StringSchema(StandardCharsets.UTF_8);
+    private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+        .setName("String")
+        .setType(SchemaType.STRING)
+        .setSchema(new byte[0]);
+
     private final Charset charset;
     private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
 
@@ -41,18 +52,22 @@ public StringSchema(Charset charset) {
     }
 
     public byte[] encode(String message) {
-        return message.getBytes(charset);
+        if (null == message) {
+            return null;
+        } else {
+            return message.getBytes(charset);
+        }
     }
 
     public String decode(byte[] bytes) {
-        return new String(bytes, charset);
+        if (null == bytes) {
+            return null;
+        } else {
+            return new String(bytes, charset);
+        }
     }
 
     public SchemaInfo getSchemaInfo() {
-        SchemaInfo schemaInfo = new SchemaInfo();
-        schemaInfo.setName("String");
-        schemaInfo.setType(SchemaType.STRING);
-        schemaInfo.setSchema(new byte[0]);
-        return schemaInfo;
+        return SCHEMA_INFO;
     }
 }
diff --git 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/PrimitiveSchemaTest.java
 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/PrimitiveSchemaTest.java
new file mode 100644
index 0000000000..fdac5395e2
--- /dev/null
+++ 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/PrimitiveSchemaTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.schema;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.ByteBufferSchema;
+import org.apache.pulsar.client.impl.schema.ByteSchema;
+import org.apache.pulsar.client.impl.schema.BytesSchema;
+import org.apache.pulsar.client.impl.schema.DoubleSchema;
+import org.apache.pulsar.client.impl.schema.FloatSchema;
+import org.apache.pulsar.client.impl.schema.IntSchema;
+import org.apache.pulsar.client.impl.schema.LongSchema;
+import org.apache.pulsar.client.impl.schema.ShortSchema;
+import org.apache.pulsar.client.impl.schema.StringSchema;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests primitive schemas.
+ */
+@Slf4j
+public class PrimitiveSchemaTest {
+
+    final private Map<Schema, List<Object>> testData = new HashMap() {
+        {
+            put(StringSchema.utf8(), Arrays.asList("my string"));
+            put(ByteSchema.of(), Arrays.asList((byte) 32767, (byte) -32768));
+            put(ShortSchema.of(), Arrays.asList((short) 32767, (short) 
-32768));
+            put(IntSchema.of(), Arrays.asList((int) 423412424, (int) 
-41243432));
+            put(LongSchema.of(), Arrays.asList(922337203685477580L, 
-922337203685477581L));
+            put(FloatSchema.of(), Arrays.asList(5678567.12312f, 
-5678567.12341f));
+            put(DoubleSchema.of(), Arrays.asList(5678567.12312d, 
-5678567.12341d));
+            put(BytesSchema.of(), Arrays.asList("my string".getBytes()));
+            put(ByteBufferSchema.of(), 
Arrays.asList(ByteBuffer.allocate(10).put("my string".getBytes())));
+        }
+    };
+
+    @Test
+    public void allSchemasShouldSupportNull() {
+        for (Schema<?> schema : testData.keySet()) {
+            assertNull(schema.encode(null),
+                "Should support null in " + schema.getSchemaInfo().getName() + 
" serialization");
+            assertNull(schema.decode( null),
+                "Should support null in " + schema.getSchemaInfo().getName() + 
" deserialization");
+        }
+    }
+
+    @Test
+    public void allSchemasShouldRoundtripInput() {
+        for (Map.Entry<Schema, List<Object>> test : testData.entrySet()) {
+            log.info("Test schema {}", test.getKey());
+            for (Object value : test.getValue()) {
+                log.info("Encode : {}", value);
+                assertEquals(value,
+                    test.getKey().decode(test.getKey().encode(value)),
+                    "Should get the original " + 
test.getKey().getSchemaInfo().getName() +
+                        " after serialization and deserialization");
+            }
+        }
+    }
+
+    @Test
+    public void allSchemasShouldHaveSchemaType() {
+        assertEquals(SchemaType.INT8, 
ByteSchema.of().getSchemaInfo().getType());
+        assertEquals(SchemaType.INT16, 
ShortSchema.of().getSchemaInfo().getType());
+        assertEquals(SchemaType.INT32, 
IntSchema.of().getSchemaInfo().getType());
+        assertEquals(SchemaType.INT64, 
LongSchema.of().getSchemaInfo().getType());
+        assertEquals(SchemaType.FLOAT, 
FloatSchema.of().getSchemaInfo().getType());
+        assertEquals(SchemaType.DOUBLE, 
DoubleSchema.of().getSchemaInfo().getType());
+        assertEquals(SchemaType.STRING, 
StringSchema.utf8().getSchemaInfo().getType());
+        assertEquals(SchemaType.BYTES, 
BytesSchema.of().getSchemaInfo().getType());
+        assertEquals(SchemaType.BYTEBUFFER, 
ByteBufferSchema.of().getSchemaInfo().getType());
+
+    }
+
+
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b7f9918d92..f1db6136a6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -66,6 +66,7 @@
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
 import org.apache.pulsar.common.api.PulsarDecoder;
@@ -82,7 +83,8 @@
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -535,8 +537,13 @@ public void connectionOpened(final ClientCnx cnx) {
             builder.recycle();
         }
 
+        SchemaInfo si = schema.getSchemaInfo();
+        if (SchemaType.BYTES == si.getType()) {
+            // don't set schema for Schema.BYTES
+            si = null;
+        }
         ByteBuf request = Commands.newSubscribe(topic, subscription, 
consumerId, requestId, getSubType(), priorityLevel,
-                consumerName, isDurable, startMessageIdData, metadata, 
readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue()), 
schema.getSchemaInfo());
+                consumerName, isDurable, startMessageIdData, metadata, 
readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue()), 
si);
         if (startMessageIdData != null) {
             startMessageIdData.recycle();
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 9401e7cd20..855151012b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -900,6 +900,9 @@ public void connectionOpened(final ClientCnx cnx) {
                         JSONSchema jsonSchema = (JSONSchema) schema;
                         schemaInfo = 
jsonSchema.getBackwardsCompatibleJsonSchemaInfo();
                     }
+                } else if (schema.getSchemaInfo().getType() == 
SchemaType.BYTES) {
+                    // don't set schema info for Schema.BYTES
+                    schemaInfo = null;
                 } else {
                     schemaInfo = schema.getSchemaInfo();
                 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
index 97214447d7..279b628ad2 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
@@ -27,6 +27,7 @@
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 
+import lombok.experimental.Accessors;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.Schema;
@@ -34,6 +35,7 @@
 @Data
 @AllArgsConstructor
 @NoArgsConstructor
+@Accessors(chain = true)
 public class SchemaInfo {
 
     @EqualsAndHashCode.Exclude
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
index 88adb532a4..87c0956a32 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -32,6 +32,46 @@
      */
     STRING,
 
+    /**
+     * A 8-byte integer.
+     */
+    INT8,
+
+    /**
+     * A 16-byte integer.
+     */
+    INT16,
+
+    /**
+     * A 32-byte integer.
+     */
+    INT32,
+
+    /**
+     * A 64-byte integer.
+     */
+    INT64,
+
+    /**
+     * A float number.
+     */
+    FLOAT,
+
+    /**
+     * A double number
+     */
+    DOUBLE,
+
+    /**
+     * A bytes array.
+     */
+    BYTES,
+
+    /**
+     * A bytebuffer.
+     */
+    BYTEBUFFER,
+
     /**
      * JSON object encoding and validation
      */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to