Repository: flink Updated Branches: refs/heads/release-1.6 f19337a7d -> 1ee705afa
[hotfix] [core] Align serialization methods in SimpleVersionedSerialization Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ee705af Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ee705af Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ee705af Branch: refs/heads/release-1.6 Commit: 1ee705afa7122e1b92fd3f1b12fe4c97c66ffd5b Parents: f19337a Author: Stephan Ewen <[email protected]> Authored: Fri Jul 20 17:24:52 2018 +0200 Committer: Stephan Ewen <[email protected]> Committed: Fri Jul 20 18:56:28 2018 +0200 ---------------------------------------------------------------------- .../core/io/SimpleVersionedSerialization.java | 33 ++++++-- .../io/SimpleVersionedSerializationTest.java | 81 ++++---------------- 2 files changed, 42 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1ee705af/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java index 8bead11..2c5b68c 100644 --- a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java +++ b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java @@ -110,7 +110,7 @@ public class SimpleVersionedSerialization { checkNotNull(datum, "datum"); final byte[] data = serializer.serialize(datum); - final byte[] versionAndData = new byte[data.length + 4]; + final byte[] versionAndData = new byte[data.length + 8]; final int version = serializer.getVersion(); versionAndData[0] = (byte) (version >> 24); @@ -118,8 +118,14 @@ public class SimpleVersionedSerialization { versionAndData[2] = (byte) (version >> 8); versionAndData[3] = (byte) version; + final int length = data.length; + versionAndData[4] = (byte) (length >> 24); + versionAndData[5] = (byte) (length >> 16); + versionAndData[6] = (byte) (length >> 8); + versionAndData[7] = (byte) length; + // move the data to the array - System.arraycopy(data, 0, versionAndData, 4, data.length); + System.arraycopy(data, 0, versionAndData, 8, data.length); return versionAndData; } @@ -142,14 +148,25 @@ public class SimpleVersionedSerialization { checkNotNull(bytes, "bytes"); checkArgument(bytes.length >= 4, "byte array below minimum length (4 bytes)"); - final byte[] dataOnly = Arrays.copyOfRange(bytes, 4, bytes.length); + final byte[] dataOnly = Arrays.copyOfRange(bytes, 8, bytes.length); final int version = ((bytes[0] & 0xff) << 24) | - ((bytes[1] & 0xff) << 16) | - ((bytes[2] & 0xff) << 8) | - (bytes[3] & 0xff); - - return serializer.deserialize(version, dataOnly); + ((bytes[1] & 0xff) << 16) | + ((bytes[2] & 0xff) << 8) | + (bytes[3] & 0xff); + + final int length = + ((bytes[4] & 0xff) << 24) | + ((bytes[5] & 0xff) << 16) | + ((bytes[6] & 0xff) << 8) | + (bytes[7] & 0xff); + + if (length == dataOnly.length) { + return serializer.deserialize(version, dataOnly); + } + else { + throw new IOException("Corrupt data, conflicting lengths. Length fields: " + length + ", data: " + dataOnly.length); + } } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/1ee705af/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java b/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java index 89a6b27..116d37c 100644 --- a/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java @@ -26,8 +26,8 @@ import org.junit.Test; import java.io.IOException; import java.nio.charset.StandardCharsets; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; /** * Tests for the {@link SimpleVersionedSerialization} class. @@ -35,7 +35,7 @@ import static org.junit.Assert.assertNotNull; public class SimpleVersionedSerializationTest { @Test - public void testStreamSerializationRoundTrip() throws IOException { + public void testSerializationRoundTrip() throws IOException { final SimpleVersionedSerializer<String> utfEncoder = new SimpleVersionedSerializer<String>() { private static final int VERSION = Integer.MAX_VALUE / 2; // version should occupy many bytes @@ -60,14 +60,20 @@ public class SimpleVersionedSerializationTest { final String testString = "dugfakgs"; final DataOutputSerializer out = new DataOutputSerializer(32); SimpleVersionedSerialization.writeVersionAndSerialize(utfEncoder, testString, out); + final byte[] outBytes = out.getCopyOfBuffer(); + + final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(utfEncoder, testString); + assertArrayEquals(bytes, outBytes); - final DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer()); + final DataInputDeserializer in = new DataInputDeserializer(bytes); final String deserialized = SimpleVersionedSerialization.readVersionAndDeSerialize(utfEncoder, in); + final String deserializedFromBytes = SimpleVersionedSerialization.readVersionAndDeSerialize(utfEncoder, outBytes); assertEquals(testString, deserialized); + assertEquals(testString, deserializedFromBytes); } @Test - public void testStreamSerializeEmpty() throws IOException { + public void testSerializeEmpty() throws IOException { final String testString = "beeeep!"; SimpleVersionedSerializer<String> emptySerializer = new SimpleVersionedSerializer<String>() { @@ -92,68 +98,15 @@ public class SimpleVersionedSerializationTest { final DataOutputSerializer out = new DataOutputSerializer(32); SimpleVersionedSerialization.writeVersionAndSerialize(emptySerializer, "abc", out); + final byte[] outBytes = out.getCopyOfBuffer(); - final DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer()); - assertEquals(testString, SimpleVersionedSerialization.readVersionAndDeSerialize(emptySerializer, in)); - } - - @Test - public void testSerializationRoundTrip() throws IOException { - final SimpleVersionedSerializer<String> utfEncoder = new SimpleVersionedSerializer<String>() { - - private static final int VERSION = Integer.MAX_VALUE / 2; // version should occupy many bytes - - @Override - public int getVersion() { - return VERSION; - } - - @Override - public byte[] serialize(String str) throws IOException { - return str.getBytes(StandardCharsets.UTF_8); - } - - @Override - public String deserialize(int version, byte[] serialized) throws IOException { - assertEquals(VERSION, version); - return new String(serialized, StandardCharsets.UTF_8); - } - }; - - final String testString = "dugfakgs"; - byte[] serialized = SimpleVersionedSerialization.writeVersionAndSerialize(utfEncoder, testString); + final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(emptySerializer, "abc"); + assertArrayEquals(bytes, outBytes); - final String deserialized = SimpleVersionedSerialization.readVersionAndDeSerialize(utfEncoder, serialized); + final DataInputDeserializer in = new DataInputDeserializer(bytes); + final String deserialized = SimpleVersionedSerialization.readVersionAndDeSerialize(emptySerializer, in); + final String deserializedFromBytes = SimpleVersionedSerialization.readVersionAndDeSerialize(emptySerializer, outBytes); assertEquals(testString, deserialized); - } - - @Test - public void testSerializeEmpty() throws IOException { - final String testString = "beeeep!"; - - SimpleVersionedSerializer<String> emptySerializer = new SimpleVersionedSerializer<String>() { - - @Override - public int getVersion() { - return 42; - } - - @Override - public byte[] serialize(String obj) throws IOException { - return new byte[0]; - } - - @Override - public String deserialize(int version, byte[] serialized) throws IOException { - assertEquals(42, version); - assertEquals(0, serialized.length); - return testString; - } - }; - - byte[] serialized = SimpleVersionedSerialization.writeVersionAndSerialize(emptySerializer, "abc"); - assertNotNull(serialized); - - assertEquals(testString, SimpleVersionedSerialization.readVersionAndDeSerialize(emptySerializer, serialized)); + assertEquals(testString, deserializedFromBytes); } }
