This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new ac13009 Modify the schema decode method can decode ByteBuf (#5123) ac13009 is described below commit ac13009c5990d99fc935f7b83ad10d4c79772990 Author: congbo <39078850+congbobo...@users.noreply.github.com> AuthorDate: Thu Sep 19 20:25:52 2019 +0800 Modify the schema decode method can decode ByteBuf (#5123) --- .../pulsar/client/api/schema/SchemaReader.java | 25 ++++++++- .../pulsar/client/impl/schema/AbstractSchema.java | 64 ++++++++++++++++++++++ .../pulsar/client/impl/schema/BooleanSchema.java | 12 +++- .../pulsar/client/impl/schema/ByteBufSchema.java | 12 +++- .../client/impl/schema/ByteBufferSchema.java | 18 +++++- .../pulsar/client/impl/schema/ByteSchema.java | 20 ++++++- .../pulsar/client/impl/schema/BytesSchema.java | 16 +++++- .../pulsar/client/impl/schema/DateSchema.java | 14 ++++- .../pulsar/client/impl/schema/DoubleSchema.java | 27 ++++++++- .../pulsar/client/impl/schema/FloatSchema.java | 26 ++++++++- .../pulsar/client/impl/schema/IntSchema.java | 27 ++++++++- .../pulsar/client/impl/schema/LongSchema.java | 26 ++++++++- .../pulsar/client/impl/schema/ShortSchema.java | 26 ++++++++- .../pulsar/client/impl/schema/StringSchema.java | 29 +++++++++- .../pulsar/client/impl/schema/StructSchema.java | 22 +++++++- .../pulsar/client/impl/schema/TimeSchema.java | 14 ++++- .../pulsar/client/impl/schema/TimestampSchema.java | 14 ++++- .../impl/schema/generic/GenericAvroReader.java | 30 +++++++++- .../impl/schema/generic/GenericJsonReader.java | 27 ++++++++- .../client/impl/schema/reader/AvroReader.java | 32 ++++++++++- .../client/impl/schema/reader/JsonReader.java | 24 +++++++- .../client/impl/schema/reader/ProtobufReader.java | 26 ++++++++- .../pulsar/client/impl/schema/AvroSchemaTest.java | 26 +++++++++ .../client/impl/schema/BooleanSchemaTest.java | 15 ++++- .../pulsar/client/impl/schema/BytesSchemaTest.java | 7 +++ .../pulsar/client/impl/schema/DateSchemaTest.java | 17 +++++- .../client/impl/schema/DoubleSchemaTest.java | 13 ++++- .../pulsar/client/impl/schema/FloatSchemaTest.java | 17 +++++- .../pulsar/client/impl/schema/IntSchemaTest.java | 15 ++++- .../pulsar/client/impl/schema/JSONSchemaTest.java | 24 ++++++++ .../pulsar/client/impl/schema/LongSchemaTest.java | 17 +++++- .../client/impl/schema/PrimitiveSchemaTest.java | 7 ++- .../client/impl/schema/ProtobufSchemaTest.java | 16 ++++++ .../pulsar/client/impl/schema/ShortSchemaTest.java | 15 ++++- .../client/impl/schema/StringSchemaTest.java | 23 ++++++++ .../pulsar/client/impl/schema/TimeSchemaTest.java | 10 +++- .../client/impl/schema/TimestampSchemaTest.java | 12 +++- 37 files changed, 708 insertions(+), 57 deletions(-) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java index 29155cc..d073387 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java @@ -18,9 +18,12 @@ */ package org.apache.pulsar.client.api.schema; +import java.io.InputStream; + /** * Deserialize messages from bytes. */ + public interface SchemaReader<T> { /** @@ -29,5 +32,25 @@ public interface SchemaReader<T> { * @param bytes the data * @return the serialized object */ - T read(byte[] bytes); + default T read(byte[] bytes) { + return read(bytes, 0, bytes.length); + } + + /** + * serialize bytes convert pojo. + * + * @param bytes the data + * @param offset the byte[] initial position + * @param length the byte[] read length + * @return the serialized object + */ + T read(byte[] bytes, int offset, int length); + + /** + * serialize bytes convert pojo. + * + * @param inputStream the stream of message + * @return the serialized object + */ + T read(InputStream inputStream); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java new file mode 100644 index 0000000..f459d5c --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import io.netty.buffer.ByteBuf; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SchemaSerializationException; + +abstract class AbstractSchema<T> implements Schema<T> { + + /** + * Check if the message read able length length is a valid object for this schema. + * + * <p>The implementation can choose what its most efficient approach to validate the schema. + * If the implementation doesn't provide it, it will attempt to use {@link #decode(ByteBuf)} + * to see if this schema can decode this message or not as a validation mechanism to verify + * the bytes. + * + * @param byteBuf the messages to verify + * @return true if it is a valid message + * @throws SchemaSerializationException if it is not a valid message + */ + void validate(ByteBuf byteBuf) { + throw new SchemaSerializationException("This method is not supported"); + }; + + /** + * Decode a byteBuf into an object using the schema definition and deserializer implementation + * + * @param byteBuf + * the byte buffer to decode + * @return the deserialized object + */ + abstract T decode(ByteBuf byteBuf); + /** + * Decode a byteBuf into an object using a given version. + * + * @param byteBuf + * the byte array to decode + * @param schemaVersion + * the schema version to decode the object. null indicates using latest version. + * @return the deserialized object + */ + T decode(ByteBuf byteBuf, byte[] schemaVersion) { + // ignore version by default (most of the primitive schema implementations ignore schema version) + return decode(byteBuf); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java index 5b27dfc..c38c356 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.client.impl.schema; -import org.apache.pulsar.client.api.Schema; +import io.netty.buffer.ByteBuf; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -26,7 +26,7 @@ import org.apache.pulsar.common.schema.SchemaType; /** * A schema for `Boolean`. */ -public class BooleanSchema implements Schema<Boolean> { +public class BooleanSchema extends AbstractSchema<Boolean> { public static BooleanSchema of() { return INSTANCE; @@ -64,6 +64,14 @@ public class BooleanSchema implements Schema<Boolean> { } @Override + public Boolean decode(ByteBuf byteBuf) { + if (null == byteBuf) { + return null; + } + return byteBuf.getBoolean(0); + } + + @Override public SchemaInfo getSchemaInfo() { return SCHEMA_INFO; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java index 4e7e6d0..e08c80a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java @@ -21,14 +21,13 @@ package org.apache.pulsar.client.impl.schema; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; -import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; /** * A variant `Bytes` schema that takes {@link io.netty.buffer.ByteBuf}. */ -public class ByteBufSchema implements Schema<ByteBuf> { +public class ByteBufSchema extends AbstractSchema<ByteBuf> { public static ByteBufSchema of() { return INSTANCE; @@ -59,6 +58,15 @@ public class ByteBufSchema implements Schema<ByteBuf> { } @Override + public ByteBuf decode(ByteBuf byteBuf) { + if (null == byteBuf) { + return null; + } else { + return byteBuf; + } + } + + @Override public SchemaInfo getSchemaInfo() { return SCHEMA_INFO; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java index 251cd93..c983aa7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java @@ -19,14 +19,16 @@ package org.apache.pulsar.client.impl.schema; import java.nio.ByteBuffer; -import org.apache.pulsar.client.api.Schema; + +import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.FastThreadLocal; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; /** * A bytebuffer schema is effectively a `BYTES` schema. */ -public class ByteBufferSchema implements Schema<ByteBuffer> { +public class ByteBufferSchema extends AbstractSchema<ByteBuffer> { public static ByteBufferSchema of() { return INSTANCE; @@ -69,6 +71,18 @@ public class ByteBufferSchema implements Schema<ByteBuffer> { } @Override + public ByteBuffer decode(ByteBuf byteBuf) { + if (null == byteBuf) { + return null; + } else { + int size = byteBuf.readableBytes(); + byte[] bytes = new byte[size]; + byteBuf.readBytes(bytes); + return ByteBuffer.wrap(bytes); + } + } + + @Override public SchemaInfo getSchemaInfo() { return SCHEMA_INFO; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java index d4635f2..d08660f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.client.impl.schema; -import org.apache.pulsar.client.api.Schema; +import io.netty.buffer.ByteBuf; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -26,7 +26,7 @@ import org.apache.pulsar.common.schema.SchemaType; /** * A schema for 'Byte'. */ -public class ByteSchema implements Schema<Byte> { +public class ByteSchema extends AbstractSchema<Byte> { public static ByteSchema of() { return INSTANCE; @@ -46,6 +46,13 @@ public class ByteSchema implements Schema<Byte> { } @Override + public void validate(ByteBuf message) { + if (message.readableBytes() != 1) { + throw new SchemaSerializationException("Size of data received by ByteSchema is not 1"); + } + } + + @Override public byte[] encode(Byte message) { if (null == message) { return null; @@ -64,6 +71,15 @@ public class ByteSchema implements Schema<Byte> { } @Override + public Byte decode(ByteBuf byteBuf) { + if (null == byteBuf) { + return null; + } + validate(byteBuf); + return byteBuf.getByte(0); + } + + @Override public SchemaInfo getSchemaInfo() { return SCHEMA_INFO; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java index 9a94fcd..6bf8923 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java @@ -18,14 +18,14 @@ */ package org.apache.pulsar.client.impl.schema; -import org.apache.pulsar.client.api.Schema; +import io.netty.buffer.ByteBuf; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; /** * A schema for bytes array. */ -public class BytesSchema implements Schema<byte[]> { +public class BytesSchema extends AbstractSchema<byte[]> { public static BytesSchema of() { return INSTANCE; @@ -48,6 +48,18 @@ public class BytesSchema implements Schema<byte[]> { } @Override + public byte[] decode(ByteBuf byteBuf) { + if (byteBuf == null) { + return null; + } + int size = byteBuf.readableBytes(); + byte[] bytes = new byte[size]; + + byteBuf.readBytes(bytes, 0, size); + return bytes; + } + + @Override public SchemaInfo getSchemaInfo() { return SCHEMA_INFO; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java index 819a7d4..7753b44 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.client.impl.schema; -import org.apache.pulsar.client.api.Schema; +import io.netty.buffer.ByteBuf; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -27,7 +27,7 @@ import java.util.Date; /** * A schema for `java.util.Date` or `java.sql.Date`. */ -public class DateSchema implements Schema<Date> { +public class DateSchema extends AbstractSchema<Date> { public static DateSchema of() { return INSTANCE; } @@ -59,6 +59,16 @@ public class DateSchema implements Schema<Date> { } @Override + public Date decode(ByteBuf byteBuf) { + if (null == byteBuf) { + return null; + } + + Long decode = LongSchema.of().decode(byteBuf); + return new Date(decode); + } + + @Override public SchemaInfo getSchemaInfo() { return SCHEMA_INFO; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java index e617efb..4ff64b2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.client.impl.schema; -import org.apache.pulsar.client.api.Schema; +import io.netty.buffer.ByteBuf; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -26,7 +26,7 @@ import org.apache.pulsar.common.schema.SchemaType; /** * A schema for `Double`. */ -public class DoubleSchema implements Schema<Double> { +public class DoubleSchema extends AbstractSchema<Double> { public static DoubleSchema of() { return INSTANCE; @@ -46,6 +46,14 @@ public class DoubleSchema implements Schema<Double> { } @Override + public void validate(ByteBuf message) { + if (message.readableBytes() != 8) { + throw new SchemaSerializationException("Size of data received by DoubleSchema is not 8"); + } + } + + + @Override public byte[] encode(Double message) { if (null == message) { return null; @@ -79,6 +87,21 @@ public class DoubleSchema implements Schema<Double> { } @Override + public Double decode(ByteBuf byteBuf) { + if (null == byteBuf) { + return null; + } + validate(byteBuf); + long value = 0; + + for (int i = 0; i < 8; i ++) { + value <<= 8; + value |= byteBuf.getByte(i) & 0xFF; + } + return Double.longBitsToDouble(value); + } + + @Override public SchemaInfo getSchemaInfo() { return SCHEMA_INFO; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java index 32ac469..7741b38 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.client.impl.schema; -import org.apache.pulsar.client.api.Schema; +import io.netty.buffer.ByteBuf; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -26,7 +26,7 @@ import org.apache.pulsar.common.schema.SchemaType; /** * A schema for `Float`. */ -public class FloatSchema implements Schema<Float> { +public class FloatSchema extends AbstractSchema<Float> { public static FloatSchema of() { return INSTANCE; @@ -46,6 +46,13 @@ public class FloatSchema implements Schema<Float> { } @Override + public void validate(ByteBuf message) { + if (message.readableBytes() != 4) { + throw new SchemaSerializationException("Size of data received by FloatSchema is not 4"); + } + } + + @Override public byte[] encode(Float message) { if (null == message) { return null; @@ -75,6 +82,21 @@ public class FloatSchema implements Schema<Float> { } @Override + public Float decode(ByteBuf byteBuf) { + if (null == byteBuf) { + return null; + } + validate(byteBuf); + int value = 0; + for (int i = 0; i < 4; i++) { + value <<= 8; + value |= byteBuf.getByte(i) & 0xFF; + } + + return Float.intBitsToFloat(value); + } + + @Override public SchemaInfo getSchemaInfo() { return SCHEMA_INFO; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java index 90092a4..63aa0b1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.client.impl.schema; -import org.apache.pulsar.client.api.Schema; +import io.netty.buffer.ByteBuf; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -26,7 +26,7 @@ import org.apache.pulsar.common.schema.SchemaType; /** * A schema for `Integer`. */ -public class IntSchema implements Schema<Integer> { +public class IntSchema extends AbstractSchema<Integer> { public static IntSchema of() { return INSTANCE; @@ -46,6 +46,13 @@ public class IntSchema implements Schema<Integer> { } @Override + public void validate(ByteBuf message) { + if (message.readableBytes() != 4) { + throw new SchemaSerializationException("Size of data received by IntSchema is not 4"); + } + } + + @Override public byte[] encode(Integer message) { if (null == message) { return null; @@ -74,6 +81,22 @@ public class IntSchema implements Schema<Integer> { } @Override + public Integer decode(ByteBuf byteBuf) { + if (null == byteBuf) { + return null; + } + validate(byteBuf); + int value = 0; + + for (int i = 0; i < 4; i++) { + value <<= 8; + value |= byteBuf.getByte(i) & 0xFF; + } + + return value; + } + + @Override public SchemaInfo getSchemaInfo() { return SCHEMA_INFO; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java index b252279..4b4da71 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.client.impl.schema; -import org.apache.pulsar.client.api.Schema; +import io.netty.buffer.ByteBuf; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -26,7 +26,7 @@ import org.apache.pulsar.common.schema.SchemaType; /** * A schema for `Long`. */ -public class LongSchema implements Schema<Long> { +public class LongSchema extends AbstractSchema<Long> { public static LongSchema of() { return INSTANCE; @@ -46,6 +46,13 @@ public class LongSchema implements Schema<Long> { } @Override + public void validate(ByteBuf message) { + if (message.readableBytes() != 8) { + throw new SchemaSerializationException("Size of data received by LongSchema is not 8"); + } + } + + @Override public byte[] encode(Long data) { if (null == data) { return null; @@ -78,6 +85,21 @@ public class LongSchema implements Schema<Long> { } @Override + public Long decode(ByteBuf byteBuf) { + if (null == byteBuf) { + return null; + } + validate(byteBuf); + long value = 0L; + for (int i = 0; i < 8; i++) { + value <<= 8; + value |= byteBuf.getByte(i) & 0xFF; + } + + return value; + } + + @Override public SchemaInfo getSchemaInfo() { return SCHEMA_INFO; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java index f1ec133..932f2df 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.client.impl.schema; -import org.apache.pulsar.client.api.Schema; +import io.netty.buffer.ByteBuf; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -26,7 +26,7 @@ import org.apache.pulsar.common.schema.SchemaType; /** * A schema for `Short`. */ -public class ShortSchema implements Schema<Short> { +public class ShortSchema extends AbstractSchema<Short> { public static ShortSchema of() { return INSTANCE; @@ -46,6 +46,13 @@ public class ShortSchema implements Schema<Short> { } @Override + public void validate(ByteBuf message) { + if (message.readableBytes() != 2) { + throw new SchemaSerializationException("Size of data received by ShortSchema is not 2"); + } + } + + @Override public byte[] encode(Short message) { if (null == message) { return null; @@ -72,6 +79,21 @@ public class ShortSchema implements Schema<Short> { } @Override + public Short decode(ByteBuf byteBuf) { + if (null == byteBuf) { + return null; + } + validate(byteBuf); + short value = 0; + + for (int i = 0; i < 2; i++) { + value <<= 8; + value |= byteBuf.getByte(i) & 0xFF; + } + return value; + } + + @Override public SchemaInfo getSchemaInfo() { return SCHEMA_INFO; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java index 6859503..3c7959f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java @@ -22,7 +22,9 @@ import static com.google.common.base.Preconditions.checkArgument; import java.util.HashMap; import java.util.Map; -import org.apache.pulsar.client.api.Schema; + +import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.FastThreadLocal; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -32,7 +34,7 @@ import java.nio.charset.StandardCharsets; /** * Schema definition for Strings encoded in UTF-8 format. */ -public class StringSchema implements Schema<String> { +public class StringSchema extends AbstractSchema<String> { static final String CHARSET_KEY = "__charset"; @@ -40,6 +42,13 @@ public class StringSchema implements Schema<String> { return UTF8; } + private static final FastThreadLocal<byte[]> tmpBuffer = new FastThreadLocal<byte[]>() { + @Override + protected byte[] initialValue() { + return new byte[1024]; + } + }; + public static StringSchema fromSchemaInfo(SchemaInfo schemaInfo) { checkArgument(SchemaType.STRING == schemaInfo.getType(), "Not a string schema"); String charsetName = schemaInfo.getProperties().get(CHARSET_KEY); @@ -93,6 +102,22 @@ public class StringSchema implements Schema<String> { } } + public String decode(ByteBuf byteBuf) { + if (null == byteBuf) { + return null; + } else { + int size = byteBuf.readableBytes(); + byte[] bytes = tmpBuffer.get(); + if (size > bytes.length) { + bytes = new byte[size * 2]; + tmpBuffer.set(bytes); + } + byteBuf.readBytes(bytes, 0, size); + + return new String(bytes, 0, size, charset); + } + } + public SchemaInfo getSchemaInfo() { return schemaInfo; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java index a6608f4..b38df41 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java @@ -26,12 +26,13 @@ import java.util.concurrent.TimeUnit; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; import org.apache.avro.Schema.Parser; import org.apache.avro.reflect.ReflectData; import org.apache.commons.codec.binary.Hex; import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.api.schema.SchemaInfoProvider; import org.apache.pulsar.client.api.schema.SchemaReader; @@ -52,7 +53,7 @@ import org.slf4j.LoggerFactory; * {@link org.apache.pulsar.common.schema.SchemaType#JSON}, * and {@link org.apache.pulsar.common.schema.SchemaType#PROTOBUF}. */ -public abstract class StructSchema<T> implements Schema<T> { +public abstract class StructSchema<T> extends AbstractSchema<T> { protected static final Logger LOG = LoggerFactory.getLogger(StructSchema.class); @@ -61,6 +62,7 @@ public abstract class StructSchema<T> implements Schema<T> { protected SchemaReader<T> reader; protected SchemaWriter<T> writer; protected SchemaInfoProvider schemaInfoProvider; + private final LoadingCache<BytesSchemaVersion, SchemaReader<T>> readerCache = CacheBuilder.newBuilder().maximumSize(100000) .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<BytesSchemaVersion, SchemaReader<T>>() { @Override @@ -100,6 +102,22 @@ public abstract class StructSchema<T> implements Schema<T> { } @Override + public T decode(ByteBuf byteBuf) { + return reader.read(new ByteBufInputStream(byteBuf)); + } + + @Override + public T decode(ByteBuf byteBuf, byte[] schemaVersion) { + try { + return readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(new ByteBufInputStream(byteBuf)); + } catch (ExecutionException e) { + LOG.error("Can't get generic schema for topic {} schema version {}", + schemaInfoProvider.getTopicName(), Hex.encodeHexString(schemaVersion), e); + throw new RuntimeException("Can't get generic schema for topic " + schemaInfoProvider.getTopicName()); + } + } + + @Override public SchemaInfo getSchemaInfo() { return this.schemaInfo; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java index 212b555..66aa4f9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.client.impl.schema; -import org.apache.pulsar.client.api.Schema; +import io.netty.buffer.ByteBuf; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -27,7 +27,7 @@ import java.sql.Time; /** * A schema for `java.sql.Time`. */ -public class TimeSchema implements Schema<Time> { +public class TimeSchema extends AbstractSchema<Time> { public static TimeSchema of() { return INSTANCE; } @@ -59,6 +59,16 @@ public class TimeSchema implements Schema<Time> { } @Override + public Time decode(ByteBuf byteBuf) { + if (null == byteBuf) { + return null; + } + + Long decode = LongSchema.of().decode(byteBuf); + return new Time(decode); + } + + @Override public SchemaInfo getSchemaInfo() { return SCHEMA_INFO; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java index de8646f..9bfbaba 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.client.impl.schema; -import org.apache.pulsar.client.api.Schema; +import io.netty.buffer.ByteBuf; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -27,7 +27,7 @@ import java.sql.Timestamp; /** * A schema for `java.sql.Timestamp`. */ -public class TimestampSchema implements Schema<Timestamp> { +public class TimestampSchema extends AbstractSchema<Timestamp> { public static TimestampSchema of() { return INSTANCE; } @@ -59,6 +59,16 @@ public class TimestampSchema implements Schema<Timestamp> { } @Override + public Timestamp decode(ByteBuf byteBuf) { + if (null == byteBuf) { + return null; + } + + Long decode = LongSchema.of().decode(byteBuf); + return new Timestamp(decode); + } + + @Override public SchemaInfo getSchemaInfo() { return SCHEMA_INFO; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java index bc4f65e..a276cb9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java @@ -29,8 +29,12 @@ import org.apache.pulsar.client.api.schema.Field; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.SchemaReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.util.List; import java.util.stream.Collectors; @@ -64,9 +68,9 @@ public class GenericAvroReader implements SchemaReader<GenericRecord> { } @Override - public GenericAvroRecord read(byte[] bytes) { + public GenericAvroRecord read(byte[] bytes, int offset, int length) { try { - Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null); + Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, offset, length, null); org.apache.avro.generic.GenericRecord avroRecord = (org.apache.avro.generic.GenericRecord)reader.read( null, @@ -76,4 +80,26 @@ public class GenericAvroReader implements SchemaReader<GenericRecord> { throw new SchemaSerializationException(e); } } + + @Override + public GenericRecord read(InputStream inputStream) { + try { + Decoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + org.apache.avro.generic.GenericRecord avroRecord = + (org.apache.avro.generic.GenericRecord)reader.read( + null, + decoder); + return new GenericAvroRecord(schemaVersion, schema, fields, avroRecord); + } catch (IOException e) { + throw new SchemaSerializationException(e); + } finally { + try { + inputStream.close(); + } catch (IOException e) { + log.error("GenericAvroReader close inputStream close error", e.getMessage()); + } + } + } + + private static final Logger log = LoggerFactory.getLogger(GenericAvroReader.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java index 1d2fced..9eeb15f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java @@ -24,9 +24,12 @@ import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.api.schema.Field; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.SchemaReader; -import org.apache.pulsar.common.schema.SchemaInfo; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.InputStream; import java.util.List; import static java.nio.charset.StandardCharsets.UTF_8; @@ -48,12 +51,30 @@ public class GenericJsonReader implements SchemaReader<GenericRecord> { this.schemaVersion = schemaVersion; } @Override - public GenericJsonRecord read(byte[] bytes) { + public GenericJsonRecord read(byte[] bytes, int offset, int length) { + try { + JsonNode jn = objectMapper.readTree(new String(bytes, offset, length, UTF_8)); + return new GenericJsonRecord(schemaVersion, fields, jn); + } catch (IOException ioe) { + throw new SchemaSerializationException(ioe); + } + } + + @Override + public GenericRecord read(InputStream inputStream) { try { - JsonNode jn = objectMapper.readTree(new String(bytes, UTF_8)); + JsonNode jn = objectMapper.readTree(inputStream); return new GenericJsonRecord(schemaVersion, fields, jn); } catch (IOException ioe) { throw new SchemaSerializationException(ioe); + } finally { + try { + inputStream.close(); + } catch (IOException e) { + log.error("GenericJsonReader close inputStream close error", e.getMessage()); + } } } + + private static final Logger log = LoggerFactory.getLogger(GenericJsonReader.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java index c502df5..dc5bfe6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java @@ -25,7 +25,11 @@ import org.apache.avro.reflect.ReflectDatumReader; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.api.schema.SchemaReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; +import java.io.InputStream; public class AvroReader<T> implements SchemaReader<T> { @@ -42,17 +46,39 @@ public class AvroReader<T> implements SchemaReader<T> { } @Override - public T read(byte[] bytes) { + public T read(byte[] bytes, int offset, int length) { + try { + BinaryDecoder decoderFromCache = decoders.get(); + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, offset, length, decoderFromCache); + if (decoderFromCache == null) { + decoders.set(decoder); + } + return reader.read(null, DecoderFactory.get().binaryDecoder(bytes, offset, length, decoder)); + } catch (IOException e) { + throw new SchemaSerializationException(e); + } + } + + @Override + public T read(InputStream inputStream) { try { BinaryDecoder decoderFromCache = decoders.get(); - BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, decoderFromCache); + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, decoderFromCache); if (decoderFromCache == null) { decoders.set(decoder); } - return reader.read(null, DecoderFactory.get().binaryDecoder(bytes, decoder)); + return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, decoder)); } catch (IOException e) { throw new SchemaSerializationException(e); + } finally { + try { + inputStream.close(); + } catch (IOException e) { + log.error("AvroReader close inputStream close error", e.getMessage()); + } } } + private static final Logger log = LoggerFactory.getLogger(AvroReader.class); + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/JsonReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/JsonReader.java index 05ef7cb..7867ddd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/JsonReader.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/JsonReader.java @@ -21,8 +21,11 @@ package org.apache.pulsar.client.impl.schema.reader; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.api.schema.SchemaReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.InputStream; public class JsonReader<T> implements SchemaReader<T> { private final Class<T> pojo; @@ -34,11 +37,28 @@ public class JsonReader<T> implements SchemaReader<T> { } @Override - public T read(byte[] bytes) { + public T read(byte[] bytes, int offset, int length) { try { - return objectMapper.readValue(bytes, this.pojo); + return objectMapper.readValue(bytes, offset, length, this.pojo); } catch (IOException e) { throw new SchemaSerializationException(e); } } + + @Override + public T read(InputStream inputStream) { + try { + return objectMapper.readValue(inputStream, pojo); + } catch (IOException e) { + throw new SchemaSerializationException(e); + } finally { + try { + inputStream.close(); + } catch (IOException e) { + log.error("JsonReader close inputStream close error", e.getMessage()); + } + } + } + + private static final Logger log = LoggerFactory.getLogger(JsonReader.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java index bca952e..01c0c12 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java @@ -23,6 +23,12 @@ import com.google.protobuf.Parser; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.api.schema.SchemaReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; + public class ProtobufReader<T extends com.google.protobuf.GeneratedMessageV3> implements SchemaReader<T> { private Parser<T> tParser; @@ -31,12 +37,28 @@ public class ProtobufReader<T extends com.google.protobuf.GeneratedMessageV3> im } @Override - public T read(byte[] bytes) { + public T read(byte[] bytes, int offset, int length) { + try { + return this.tParser.parseFrom(bytes, offset, length); + } catch (InvalidProtocolBufferException e) { + throw new SchemaSerializationException(e); + } + } + + @Override + public T read(InputStream inputStream) { try { - return this.tParser.parseFrom(bytes); + return this.tParser.parseFrom(inputStream); } catch (InvalidProtocolBufferException e) { throw new SchemaSerializationException(e); + } finally { + try { + inputStream.close(); + } catch (IOException e) { + log.error("ProtobufReader close inputStream close error", e.getMessage()); + } } } + private static final Logger log = LoggerFactory.getLogger(ProtobufReader.class); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java index 6e31dbd..62acef4 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java @@ -30,6 +30,8 @@ import java.math.BigDecimal; import java.util.Arrays; import java.util.Date; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -302,4 +304,28 @@ public class AvroSchemaTest { assertEquals(object, nasaMission); } + @Test + public void testDecodeByteBuf() { + AvroSchema<Foo> avroSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build()); + + Foo foo1 = new Foo(); + foo1.setField1("foo1"); + foo1.setField2("bar1"); + foo1.setField4(new Bar()); + foo1.setFieldUnableNull("notNull"); + + Foo foo2 = new Foo(); + foo2.setField1("foo2"); + foo2.setField2("bar2"); + + byte[] bytes1 = avroSchema.encode(foo1); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(bytes1.length); + byteBuf.writeBytes(bytes1); + + Foo object1 = avroSchema.decode(byteBuf); + Assert.assertTrue(bytes1.length > 0); + assertEquals(object1, foo1); + + } + } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/BooleanSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/BooleanSchemaTest.java index 9f62d1a..fdcf471 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/BooleanSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/BooleanSchemaTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.impl.schema; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import org.testng.Assert; import org.testng.annotations.Test; @@ -54,12 +56,23 @@ public class BooleanSchemaTest { BooleanSchema schema = BooleanSchema.of(); Assert.assertEquals(new Boolean(true), schema.decode(trueBytes)); Assert.assertEquals(new Boolean(false), schema.decode(falseBytes)); + + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(1); + byteBuf.writeBytes(trueBytes); + Assert.assertEquals(new Boolean(true), schema.decode(byteBuf)); + byteBuf.writerIndex(0); + byteBuf.writeBytes(falseBytes); + + Assert.assertEquals(new Boolean(false), schema.decode(byteBuf)); } @Test public void testNullEncodeDecode() { + ByteBuf byteBuf = null; + byte[] bytes = null; Assert.assertNull(BooleanSchema.of().encode(null)); - Assert.assertNull(BooleanSchema.of().decode(null)); + Assert.assertNull(BooleanSchema.of().decode(byteBuf)); + Assert.assertNull(BooleanSchema.of().decode(bytes)); } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/BytesSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/BytesSchemaTest.java index 9a2b6ca..48bd5bc 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/BytesSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/BytesSchemaTest.java @@ -19,8 +19,11 @@ package org.apache.pulsar.client.impl.schema; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertSame; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import org.apache.pulsar.client.api.Schema; import org.testng.annotations.Test; @@ -47,6 +50,10 @@ public class BytesSchemaTest { byte[] deserializedData = schema.decode(serializedData); assertSame(data, deserializedData); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(deserializedData.length); + byteBuf.writeBytes(deserializedData); + assertEquals(data, ((BytesSchema)schema).decode(byteBuf)); + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DateSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DateSchemaTest.java index 0e707ed..69499d8 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DateSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DateSchemaTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.impl.schema; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import org.testng.Assert; import org.testng.annotations.Test; @@ -46,7 +48,11 @@ public class DateSchemaTest { public void testSchemaEncodeDecodeFidelity() { DateSchema schema = DateSchema.of(); Date date = new Date(); - Assert.assertEquals(date, schema.decode(schema.encode(date))); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8); + byte[] bytes = schema.encode(date); + byteBuf.writeBytes(bytes); + Assert.assertEquals(date, schema.decode(bytes)); + Assert.assertEquals(date, schema.decode(byteBuf)); } @Test @@ -63,13 +69,20 @@ public class DateSchemaTest { }; long expected = 10*65536 + 24*256 + 42; DateSchema schema = DateSchema.of(); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8); + byteBuf.writeBytes(byteData); Assert.assertEquals(expected, schema.decode(byteData).getTime()); + Assert.assertEquals(expected, schema.decode(byteBuf).getTime()); } @Test public void testNullEncodeDecode() { + ByteBuf byteBuf = null; + byte[] bytes = null; + Assert.assertNull(DateSchema.of().encode(null)); - Assert.assertNull(DateSchema.of().decode(null)); + Assert.assertNull(DateSchema.of().decode(byteBuf)); + Assert.assertNull(DateSchema.of().decode(bytes)); } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DoubleSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DoubleSchemaTest.java index f5a1ab3..6fc76de 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DoubleSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DoubleSchemaTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.impl.schema; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import org.testng.Assert; import org.testng.annotations.Test; @@ -45,13 +47,20 @@ public class DoubleSchemaTest { public void testSchemaEncodeDecodeFidelity() { DoubleSchema schema = DoubleSchema.of(); Double dbl = new Double(1234578.8754321); - Assert.assertEquals(dbl, schema.decode(schema.encode(dbl))); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8); + byte[] bytes = schema.encode(dbl); + byteBuf.writeBytes(bytes); + Assert.assertEquals(dbl, schema.decode(bytes)); + Assert.assertEquals(dbl, schema.decode(byteBuf)); } @Test public void testNullEncodeDecode() { + ByteBuf byteBuf = null; + byte[] bytes = null; Assert.assertNull(DoubleSchema.of().encode(null)); - Assert.assertNull(DoubleSchema.of().decode(null)); + Assert.assertNull(DoubleSchema.of().decode(byteBuf)); + Assert.assertNull(DoubleSchema.of().decode(bytes)); } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/FloatSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/FloatSchemaTest.java index b915582..5d5dd0b 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/FloatSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/FloatSchemaTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.impl.schema; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import org.testng.Assert; import org.testng.annotations.Test; @@ -40,14 +42,23 @@ public class FloatSchemaTest { @Test public void testSchemaEncodeDecodeFidelity() { FloatSchema schema = FloatSchema.of(); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(4); Float dbl = new Float(1234578.8754321); - Assert.assertEquals(dbl, schema.decode(schema.encode(dbl))); + byte[] bytes = schema.encode(dbl); + byteBuf.writeBytes(schema.encode(dbl)); + Assert.assertEquals(dbl, schema.decode(bytes)); + Assert.assertEquals(dbl, schema.decode(byteBuf)); + } @Test public void testNullEncodeDecode() { + ByteBuf byteBuf = null; + byte[] bytes = null; Assert.assertNull(FloatSchema.of().encode(null)); - Assert.assertNull(FloatSchema.of().decode(null)); + Assert.assertNull(FloatSchema.of().decode(bytes)); + Assert.assertNull(FloatSchema.of().decode(byteBuf)); } - } + + diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/IntSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/IntSchemaTest.java index fb17811..98f00ec 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/IntSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/IntSchemaTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.impl.schema; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import org.testng.Assert; import org.testng.annotations.Test; @@ -40,10 +42,15 @@ public class IntSchemaTest { public void testSchemaEncodeDecodeFidelity() { IntSchema schema = IntSchema.of(); int start = 348592040; + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(4); for (int i = 0; i < 100; ++i) { byte[] encode = schema.encode(start + i); + byteBuf.writerIndex(0); + byteBuf.writeBytes(encode); int decoded = schema.decode(encode); Assert.assertEquals(decoded, start + i); + decoded = schema.decode(byteBuf); + Assert.assertEquals(decoded, start + i); } } @@ -55,15 +62,21 @@ public class IntSchemaTest { 24, 42 }; + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(4); Integer expected = 10*65536 + 24*256 + 42; IntSchema schema = IntSchema.of(); + byteBuf.writeBytes(byteData); Assert.assertEquals(expected, schema.decode(byteData)); + Assert.assertEquals(expected, schema.decode(byteBuf)); } @Test public void testNullEncodeDecode() { + ByteBuf byteBuf = null; + byte[] bytes = null; Assert.assertNull(IntSchema.of().encode(null)); - Assert.assertNull(IntSchema.of().decode(null)); + Assert.assertNull(IntSchema.of().decode(bytes)); + Assert.assertNull(IntSchema.of().decode(byteBuf)); } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java index 5efb82b..f280113 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java @@ -21,6 +21,8 @@ package org.apache.pulsar.client.impl.schema; import java.util.Collections; import java.util.List; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema; import org.apache.pulsar.client.api.SchemaSerializationException; @@ -302,4 +304,26 @@ public class JSONSchemaTest { JSONSchema<Foo> jsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build()); jsonSchema.decode(new byte[0]); } + + @Test + public void testDecodeByteBuf() { + JSONSchema<Foo> jsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build()); + + Foo foo1 = new Foo(); + foo1.setField1("foo1"); + foo1.setField2("bar1"); + foo1.setField4(new Bar()); + foo1.setFieldUnableNull("notNull"); + + Foo foo2 = new Foo(); + foo2.setField1("foo2"); + foo2.setField2("bar2"); + + byte[] bytes1 = jsonSchema.encode(foo1); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(bytes1.length); + byteBuf.writeBytes(bytes1); + Assert.assertTrue(bytes1.length > 0); + assertEquals(jsonSchema.decode(byteBuf), foo1); + + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LongSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LongSchemaTest.java index 84c217c..397f5ac 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LongSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LongSchemaTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.impl.schema; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import org.testng.Assert; import org.testng.annotations.Test; @@ -43,11 +45,17 @@ public class LongSchemaTest { @Test public void testSchemaEncodeDecodeFidelity() { LongSchema longSchema = LongSchema.of(); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8); long start = 348592040; for (int i = 0; i < 100; ++i) { byte[] encode = longSchema.encode(start + i); long decoded = longSchema.decode(encode); Assert.assertEquals(decoded, start + i); + byteBuf.writerIndex(0); + byteBuf.writeBytes(encode); + + decoded = longSchema.decode(byteBuf); + Assert.assertEquals(decoded, start + i); } } @@ -65,13 +73,20 @@ public class LongSchemaTest { }; Long expected = 10*65536l + 24*256 + 42; LongSchema longSchema = LongSchema.of(); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8); + byteBuf.writeBytes(byteData); + Assert.assertEquals(expected, longSchema.decode(byteData)); + Assert.assertEquals(expected, longSchema.decode(byteBuf)); } @Test public void testNullEncodeDecode() { + ByteBuf byteBuf = null; + byte[] bytes = null; Assert.assertNull(LongSchema.of().encode(null)); - Assert.assertNull(LongSchema.of().decode(null)); + Assert.assertNull(LongSchema.of().decode(byteBuf)); + Assert.assertNull(LongSchema.of().decode(bytes)); } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java index 31d1c14..264bf4c 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java @@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.nio.ByteBuffer; import java.sql.Date; @@ -88,10 +89,14 @@ public class PrimitiveSchemaTest { @Test(dataProvider = "schemas") public void allSchemasShouldSupportNull(Map<Schema, List<Object>> testData) { for (Schema<?> schema : testData.keySet()) { + byte[] bytes = null; + ByteBuf byteBuf = null; assertNull(schema.encode(null), "Should support null in " + schema.getSchemaInfo().getName() + " serialization"); - assertNull(schema.decode( null), + assertNull(schema.decode(bytes), "Should support null in " + schema.getSchemaInfo().getName() + " deserialization"); + assertNull(((AbstractSchema)schema).decode(byteBuf), + "Should support null in " + schema.getSchemaInfo().getName() + " deserialization"); } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java index 57ccd56..fc03412 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java @@ -20,6 +20,8 @@ package org.apache.pulsar.client.impl.schema; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema; import org.apache.pulsar.common.schema.SchemaType; @@ -111,4 +113,18 @@ public class ProtobufSchemaTest { Assert.assertEquals(new ObjectMapper().writeValueAsString(protobufSchema.getSchemaInfo().getProperties()), EXPECTED_PARSING_INFO); } + + @Test + public void testDecodeByteBuf() throws JsonProcessingException { + ProtobufSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> protobufSchema + = ProtobufSchema.of(org.apache.pulsar.client.schema.proto.Test.TestMessage.class); + org.apache.pulsar.client.schema.proto.Test.TestMessage testMessage = + org.apache.pulsar.client.schema.proto.Test.TestMessage.newBuilder().build(); + byte[] bytes = protobufSchema.encode(org.apache.pulsar.client.schema.proto.Test.TestMessage.newBuilder().build()); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(bytes.length); + byteBuf.writeBytes(bytes); + + Assert.assertEquals(testMessage, protobufSchema.decode(byteBuf)); + + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ShortSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ShortSchemaTest.java index 1252dc9..54fd35b 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ShortSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ShortSchemaTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.impl.schema; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import org.testng.Assert; import org.testng.annotations.Test; @@ -37,11 +39,16 @@ public class ShortSchemaTest { @Test public void testSchemaEncodeDecodeFidelity() { ShortSchema schema = ShortSchema.of(); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(2); short start = 3440; for (short i = 0; i < 100; ++i) { byte[] encode = schema.encode((short)(start + i)); + byteBuf.writerIndex(0); + byteBuf.writeBytes(encode); int decoded = schema.decode(encode); Assert.assertEquals(decoded, start + i); + decoded = schema.decode(byteBuf); + Assert.assertEquals(decoded, start + i); } } @@ -53,13 +60,19 @@ public class ShortSchemaTest { }; Short expected = 24*256 + 42; ShortSchema schema = ShortSchema.of(); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(2); + byteBuf.writeBytes(byteData); Assert.assertEquals(expected, schema.decode(byteData)); + Assert.assertEquals(expected, schema.decode(byteBuf)); } @Test public void testNullEncodeDecode() { + ByteBuf byteBuf = null; + byte[] bytes = null; Assert.assertNull(ShortSchema.of().encode(null)); - Assert.assertNull(ShortSchema.of().decode(null)); + Assert.assertNull(ShortSchema.of().decode(byteBuf)); + Assert.assertNull(ShortSchema.of().decode(bytes)); } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java index 6aa9dd7..b09bf4d 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java @@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.US_ASCII; import static java.nio.charset.StandardCharsets.UTF_8; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals; @@ -29,6 +30,9 @@ import java.nio.charset.Charset; import java.util.Collections; import java.util.HashMap; import java.util.Map; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.testng.annotations.DataProvider; @@ -51,6 +55,11 @@ public class StringSchemaTest { String decodedString = schema.decode(data); assertEquals(decodedString, myString); + + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(data.length); + byteBuf.writeBytes(data); + + assertEquals(schema.decode(byteBuf), myString); } @Test @@ -69,6 +78,11 @@ public class StringSchemaTest { String decodedString = schema.decode(data); assertEquals(decodedString, myString); + + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(data.length); + byteBuf.writeBytes(data); + + assertEquals(schema.decode(byteBuf), myString); } @Test @@ -86,6 +100,10 @@ public class StringSchemaTest { String decodedString = schema.decode(data); assertEquals(decodedString, myString); + + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(data.length); + byteBuf.writeBytes(data); + assertEquals(schema.decode(byteBuf), myString); } @DataProvider(name = "charsets") @@ -117,6 +135,11 @@ public class StringSchemaTest { String decodedString = schema.decode(data); assertEquals(decodedString, myString); + + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(data.length); + byteBuf.writeBytes(data); + + assertEquals(schema.decode(byteBuf), myString); } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/TimeSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/TimeSchemaTest.java index 7b5cd56..fab20e7 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/TimeSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/TimeSchemaTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.impl.schema; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import org.testng.Assert; import org.testng.annotations.Test; @@ -61,15 +63,21 @@ public class TimeSchemaTest { 24, 42 }; + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(byteData.length); + byteBuf.writeBytes(byteData); long expected = 10*65536 + 24*256 + 42; TimeSchema schema = TimeSchema.of(); Assert.assertEquals(expected, schema.decode(byteData).getTime()); + Assert.assertEquals(expected, schema.decode(byteBuf).getTime()); } @Test public void testNullEncodeDecode() { + ByteBuf byteBuf = null; + byte[] bytes = null; Assert.assertNull(TimeSchema.of().encode(null)); - Assert.assertNull(TimeSchema.of().decode(null)); + Assert.assertNull(TimeSchema.of().decode(bytes)); + Assert.assertNull(TimeSchema.of().decode(byteBuf)); } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/TimestampSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/TimestampSchemaTest.java index 1d864b6..022ba38 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/TimestampSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/TimestampSchemaTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.impl.schema; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import org.testng.Assert; import org.testng.annotations.Test; @@ -61,15 +63,23 @@ public class TimestampSchemaTest { 24, 42 }; + + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(byteData.length); + byteBuf.writeBytes(byteData); long expected = 10*65536 + 24*256 + 42; TimestampSchema schema = TimestampSchema.of(); Assert.assertEquals(expected, schema.decode(byteData).getTime()); + Assert.assertEquals(expected, schema.decode(byteBuf).getTime()); + } @Test public void testNullEncodeDecode() { + ByteBuf byteBuf = null; + byte[] bytes = null; Assert.assertNull(TimestampSchema.of().encode(null)); - Assert.assertNull(TimestampSchema.of().decode(null)); + Assert.assertNull(TimestampSchema.of().decode(byteBuf)); + Assert.assertNull(TimestampSchema.of().decode(bytes)); } }