Repository: kafka Updated Branches: refs/heads/trunk d9479275f -> dc5bf4bd4
KAFKA-5218; New Short serializer, deserializer, serde Author: Mario Molina <[email protected]> Reviewers: Matthias J. Sax <[email protected]>, Damian Guy <[email protected]>, Michael G. Noll <[email protected]>, Guozhang Wang <[email protected]> Closes #3017 from mmolimar/KAFKA-5218 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dc5bf4bd Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dc5bf4bd Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dc5bf4bd Branch: refs/heads/trunk Commit: dc5bf4bd453495af050ea7c0ec7a66b8d2b2e8d4 Parents: d947927 Author: Mario Molina <[email protected]> Authored: Wed May 31 15:09:58 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed May 31 15:09:58 2017 -0700 ---------------------------------------------------------------------- checkstyle/suppressions.xml | 2 +- .../serialization/IntegerDeserializer.java | 3 +- .../common/serialization/LongDeserializer.java | 3 +- .../kafka/common/serialization/Serdes.java | 17 ++ .../common/serialization/ShortDeserializer.java | 47 ++++ .../common/serialization/ShortSerializer.java | 40 +++ .../common/serialization/SerializationTest.java | 249 +++++-------------- 7 files changed, 176 insertions(+), 185 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/dc5bf4bd/checkstyle/suppressions.xml ---------------------------------------------------------------------- diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index a8d3033..f2fb3d9 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -52,7 +52,7 @@ files="AbstractRequest.java"/> <suppress checks="NPathComplexity" - files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|SslTransportLayer|Sender).java"/> + files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|SslTransportLayer|Sender|Serdes).java"/> <!-- clients tests --> <suppress checks="ClassDataAbstractionCoupling" http://git-wip-us.apache.org/repos/asf/kafka/blob/dc5bf4bd/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java index 29c3acc..45f8cf1 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java @@ -30,8 +30,7 @@ public class IntegerDeserializer implements Deserializer<Integer> { if (data == null) return null; if (data.length != 4) { - throw new SerializationException("Size of data received by IntegerDeserializer is " + - "not 4"); + throw new SerializationException("Size of data received by IntegerDeserializer is not 4"); } int value = 0; http://git-wip-us.apache.org/repos/asf/kafka/blob/dc5bf4bd/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java index e5bfe3c..a58b1d3 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java @@ -30,8 +30,7 @@ public class LongDeserializer implements Deserializer<Long> { if (data == null) return null; if (data.length != 8) { - throw new SerializationException("Size of data received by LongDeserializer is " + - "not 8"); + throw new SerializationException("Size of data received by LongDeserializer is not 8"); } long value = 0; http://git-wip-us.apache.org/repos/asf/kafka/blob/dc5bf4bd/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java index 0793321..4772ea5 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java @@ -70,6 +70,12 @@ public class Serdes { } } + static public final class ShortSerde extends WrapperSerde<Short> { + public ShortSerde() { + super(new ShortSerializer(), new ShortDeserializer()); + } + } + static public final class FloatSerde extends WrapperSerde<Float> { public FloatSerde() { super(new FloatSerializer(), new FloatDeserializer()); @@ -112,6 +118,10 @@ public class Serdes { return (Serde<T>) String(); } + if (Short.class.isAssignableFrom(type)) { + return (Serde<T>) Short(); + } + if (Integer.class.isAssignableFrom(type)) { return (Serde<T>) Integer(); } @@ -176,6 +186,13 @@ public class Serdes { } /* + * A serde for nullable {@code Short} type. + */ + static public Serde<Short> Short() { + return new ShortSerde(); + } + + /* * A serde for nullable {@code Float} type. */ static public Serde<Float> Float() { http://git-wip-us.apache.org/repos/asf/kafka/blob/dc5bf4bd/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java new file mode 100644 index 0000000..45aa8ae --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import org.apache.kafka.common.errors.SerializationException; + +import java.util.Map; + +public class ShortDeserializer implements Deserializer<Short> { + + public void configure(Map<String, ?> configs, boolean isKey) { + // nothing to do + } + + public Short deserialize(String topic, byte[] data) { + if (data == null) + return null; + if (data.length != 2) { + throw new SerializationException("Size of data received by ShortDeserializer is not 2"); + } + + short value = 0; + for (byte b : data) { + value <<= 8; + value |= b & 0xFF; + } + return value; + } + + public void close() { + // nothing to do + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/dc5bf4bd/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java new file mode 100644 index 0000000..a66aaa0 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import java.util.Map; + +public class ShortSerializer implements Serializer<Short> { + + public void configure(Map<String, ?> configs, boolean isKey) { + // nothing to do + } + + public byte[] serialize(String topic, Short data) { + if (data == null) + return null; + + return new byte[] { + (byte) (data >>> 8), + data.byteValue() + }; + } + + public void close() { + // nothing to do + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/dc5bf4bd/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java index 12ccbe4..134882f 100644 --- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java @@ -17,10 +17,11 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.utils.Bytes; import org.junit.Test; import java.nio.ByteBuffer; -import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -29,27 +30,50 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsNull.nullValue; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; public class SerializationTest { final private String topic = "testTopic"; + final private Map<Class<Object>, List<Object>> testData = new HashMap() { + { + put(String.class, Arrays.asList("my string")); + put(Short.class, Arrays.asList((short) 32767, (short) -32768)); + put(Integer.class, Arrays.asList((int) 423412424, (int) -41243432)); + put(Long.class, Arrays.asList(922337203685477580L, -922337203685477581L)); + put(Float.class, Arrays.asList(5678567.12312f, -5678567.12341f)); + put(Double.class, Arrays.asList(5678567.12312d, -5678567.12341d)); + put(byte[].class, Arrays.asList("my string".getBytes())); + put(ByteBuffer.class, Arrays.asList(ByteBuffer.allocate(10).put("my string".getBytes()))); + put(Bytes.class, Arrays.asList(new Bytes("my string".getBytes()))); + } + }; private class DummyClass { - } @Test - public void testSerdeFrom() { - Serde<Long> thisSerde = Serdes.serdeFrom(Long.class); - Serde<Long> otherSerde = Serdes.Long(); - - Long value = 423412424L; + public void allSerdesShouldRoundtripInput() { + for (Map.Entry<Class<Object>, List<Object>> test : testData.entrySet()) { + try (Serde<Object> serde = Serdes.serdeFrom(test.getKey())) { + for (Object value : test.getValue()) { + assertEquals("Should get the original " + test.getKey().getSimpleName() + + " after serialization and deserialization", value, + serde.deserializer().deserialize(topic, serde.serializer().serialize(topic, value))); + } + } + } + } - assertEquals("Should get the original long after serialization and deserialization", - value, thisSerde.deserializer().deserialize(topic, otherSerde.serializer().serialize(topic, value))); - assertEquals("Should get the original long after serialization and deserialization", - value, otherSerde.deserializer().deserialize(topic, thisSerde.serializer().serialize(topic, value))); + @Test + public void allSerdesShouldSupportNull() { + for (Class<?> cls : testData.keySet()) { + try (Serde<?> serde = Serdes.serdeFrom(cls)) { + assertThat("Should support null in " + cls.getSimpleName() + " serialization", + serde.serializer().serialize(topic, null), nullValue()); + assertThat("Should support null in " + cls.getSimpleName() + " deserialization", + serde.deserializer().deserialize(topic, null), nullValue()); + } + } } @Test(expected = IllegalArgumentException.class) @@ -59,200 +83,65 @@ public class SerializationTest { @Test(expected = IllegalArgumentException.class) public void testSerdeFromNotNull() { - Serdes.serdeFrom(null, Serdes.Long().deserializer()); + try (Serde<Long> serde = Serdes.Long()) { + Serdes.serdeFrom(null, serde.deserializer()); + } } @Test - public void testStringSerializer() { + public void stringSerdeShouldSupportDifferentEncodings() { String str = "my string"; - - List<String> encodings = new ArrayList<String>(); - encodings.add("UTF8"); - encodings.add("UTF-16"); + List<String> encodings = Arrays.asList("UTF8", "UTF-16"); for (String encoding : encodings) { - Serde<String> serDeser = getStringSerde(encoding); - Serializer<String> serializer = serDeser.serializer(); - Deserializer<String> deserializer = serDeser.deserializer(); - - assertEquals("Should get the original string after serialization and deserialization with encoding " + encoding, - str, deserializer.deserialize(topic, serializer.serialize(topic, str))); + try (Serde<String> serDeser = getStringSerde(encoding)) { - assertEquals("Should support null in serialization and deserialization with encoding " + encoding, - null, deserializer.deserialize(topic, serializer.serialize(topic, null))); + Serializer<String> serializer = serDeser.serializer(); + Deserializer<String> deserializer = serDeser.deserializer(); + assertEquals("Should get the original string after serialization and deserialization with encoding " + encoding, + str, deserializer.deserialize(topic, serializer.serialize(topic, str))); + } } } - @Test - public void testIntegerSerializer() { - Integer[] integers = new Integer[]{ - 423412424, - -41243432 - }; - - Serializer<Integer> serializer = Serdes.Integer().serializer(); - Deserializer<Integer> deserializer = Serdes.Integer().deserializer(); - - for (Integer integer : integers) { - assertEquals("Should get the original integer after serialization and deserialization", - integer, deserializer.deserialize(topic, serializer.serialize(topic, integer))); - } - - assertEquals("Should support null in serialization and deserialization", - null, deserializer.deserialize(topic, serializer.serialize(topic, null))); - - serializer.close(); - deserializer.close(); - } - - @Test - public void testLongSerializer() { - Long[] longs = new Long[]{ - 922337203685477580L, - -922337203685477581L - }; - - Serializer<Long> serializer = Serdes.Long().serializer(); - Deserializer<Long> deserializer = Serdes.Long().deserializer(); - - for (Long value : longs) { - assertEquals("Should get the original long after serialization and deserialization", - value, deserializer.deserialize(topic, serializer.serialize(topic, value))); - } - - assertEquals("Should support null in serialization and deserialization", - null, deserializer.deserialize(topic, serializer.serialize(topic, null))); - - serializer.close(); - deserializer.close(); - } - - @Test - public void shouldSerializeDeserializeFloat() { - final Float[] floats = new Float[]{ - 5678567.12312f, - -5678567.12341f - }; - final Serializer<Float> serializer = Serdes.Float().serializer(); - final Deserializer<Float> deserializer = Serdes.Float().deserializer(); - - for (final Float value : floats) { - assertThat("Should round-trip a float", - value, equalTo(deserializer.deserialize(topic, serializer.serialize(topic, value)))); - } - - serializer.close(); - deserializer.close(); - } - - @Test - public void floatSerializerShouldReturnNullForNull() { - final Serializer<Float> serializer = Serdes.Float().serializer(); - assertThat(serializer.serialize(topic, null), nullValue()); - serializer.close(); - } - - @Test - public void floatDeserializerShouldReturnNullForNull() { - final Deserializer<Float> deserializer = Serdes.Float().deserializer(); - assertThat(deserializer.deserialize(topic, null), nullValue()); - deserializer.close(); - } - - @Test + @Test(expected = SerializationException.class) public void floatDeserializerShouldThrowSerializationExceptionOnZeroBytes() { - final Deserializer<Float> deserializer = Serdes.Float().deserializer(); - try { - deserializer.deserialize(topic, new byte[0]); - fail("Should have thrown a SerializationException because of zero input bytes"); - } catch (SerializationException e) { - // Ignore (there's no contract on the details of the exception) + try (Serde<Float> serde = Serdes.Float()) { + serde.deserializer().deserialize(topic, new byte[0]); } - deserializer.close(); } - @Test + @Test(expected = SerializationException.class) public void floatDeserializerShouldThrowSerializationExceptionOnTooFewBytes() { - final Deserializer<Float> deserializer = Serdes.Float().deserializer(); - try { - deserializer.deserialize(topic, new byte[3]); - fail("Should have thrown a SerializationException because of too few input bytes"); - } catch (SerializationException e) { - // Ignore (there's no contract on the details of the exception) + try (Serde<Float> serde = Serdes.Float()) { + serde.deserializer().deserialize(topic, new byte[3]); } - deserializer.close(); } - @Test + @Test(expected = SerializationException.class) public void floatDeserializerShouldThrowSerializationExceptionOnTooManyBytes() { - final Deserializer<Float> deserializer = Serdes.Float().deserializer(); - try { - deserializer.deserialize(topic, new byte[5]); - fail("Should have thrown a SerializationException because of too many input bytes"); - } catch (SerializationException e) { - // Ignore (there's no contract on the details of the exception) + try (Serde<Float> serde = Serdes.Float()) { + serde.deserializer().deserialize(topic, new byte[5]); } - deserializer.close(); } @Test public void floatSerdeShouldPreserveNaNValues() { - final int someNaNAsIntBits = 0x7f800001; - final float someNaN = Float.intBitsToFloat(someNaNAsIntBits); - final int anotherNaNAsIntBits = 0x7f800002; - final float anotherNaN = Float.intBitsToFloat(anotherNaNAsIntBits); - - final Serde<Float> serde = Serdes.Float(); - // Because of NaN semantics we must assert based on the raw int bits. - final Float roundtrip = serde.deserializer().deserialize(topic, - serde.serializer().serialize(topic, someNaN)); - assertThat(Float.floatToRawIntBits(roundtrip), equalTo(someNaNAsIntBits)); - final Float otherRoundtrip = serde.deserializer().deserialize(topic, - serde.serializer().serialize(topic, anotherNaN)); - assertThat(Float.floatToRawIntBits(otherRoundtrip), equalTo(anotherNaNAsIntBits)); - - serde.close(); - } - - @Test - public void testDoubleSerializer() { - Double[] doubles = new Double[]{ - 5678567.12312d, - -5678567.12341d - }; - - Serializer<Double> serializer = Serdes.Double().serializer(); - Deserializer<Double> deserializer = Serdes.Double().deserializer(); - - for (Double value : doubles) { - assertEquals("Should get the original double after serialization and deserialization", - value, deserializer.deserialize(topic, serializer.serialize(topic, value))); + int someNaNAsIntBits = 0x7f800001; + float someNaN = Float.intBitsToFloat(someNaNAsIntBits); + int anotherNaNAsIntBits = 0x7f800002; + float anotherNaN = Float.intBitsToFloat(anotherNaNAsIntBits); + + try (Serde<Float> serde = Serdes.Float()) { + // Because of NaN semantics we must assert based on the raw int bits. + Float roundtrip = serde.deserializer().deserialize(topic, + serde.serializer().serialize(topic, someNaN)); + assertThat(Float.floatToRawIntBits(roundtrip), equalTo(someNaNAsIntBits)); + Float otherRoundtrip = serde.deserializer().deserialize(topic, + serde.serializer().serialize(topic, anotherNaN)); + assertThat(Float.floatToRawIntBits(otherRoundtrip), equalTo(anotherNaNAsIntBits)); } - - assertEquals("Should support null in serialization and deserialization", - null, deserializer.deserialize(topic, serializer.serialize(topic, null))); - - serializer.close(); - deserializer.close(); - } - - @Test - public void testByteBufferSerializer() { - ByteBuffer buf = ByteBuffer.allocate(10); - buf.put("my string".getBytes()); - - Serializer<ByteBuffer> serializer = Serdes.ByteBuffer().serializer(); - Deserializer<ByteBuffer> deserializer = Serdes.ByteBuffer().deserializer(); - - assertEquals("Should get the original ByteBuffer after serialization and deserialization", - buf, deserializer.deserialize(topic, serializer.serialize(topic, buf))); - - assertEquals("Should support null in serialization and deserialization", - null, deserializer.deserialize(topic, serializer.serialize(topic, null))); - - serializer.close(); - deserializer.close(); } private Serde<String> getStringSerde(String encoder) {
