Repository: flink
Updated Branches:
  refs/heads/release-1.6 f19337a7d -> 1ee705afa


[hotfix] [core] Align serialization methods in SimpleVersionedSerialization


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ee705af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ee705af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ee705af

Branch: refs/heads/release-1.6
Commit: 1ee705afa7122e1b92fd3f1b12fe4c97c66ffd5b
Parents: f19337a
Author: Stephan Ewen <[email protected]>
Authored: Fri Jul 20 17:24:52 2018 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Fri Jul 20 18:56:28 2018 +0200

----------------------------------------------------------------------
 .../core/io/SimpleVersionedSerialization.java   | 33 ++++++--
 .../io/SimpleVersionedSerializationTest.java    | 81 ++++----------------
 2 files changed, 42 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1ee705af/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
 
b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
index 8bead11..2c5b68c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
@@ -110,7 +110,7 @@ public class SimpleVersionedSerialization {
                checkNotNull(datum, "datum");
 
                final byte[] data = serializer.serialize(datum);
-               final byte[] versionAndData = new byte[data.length + 4];
+               final byte[] versionAndData = new byte[data.length + 8];
 
                final int version = serializer.getVersion();
                versionAndData[0] = (byte) (version >> 24);
@@ -118,8 +118,14 @@ public class SimpleVersionedSerialization {
                versionAndData[2] = (byte) (version >>  8);
                versionAndData[3] = (byte)  version;
 
+               final int length = data.length;
+               versionAndData[4] = (byte) (length >> 24);
+               versionAndData[5] = (byte) (length >> 16);
+               versionAndData[6] = (byte) (length >>  8);
+               versionAndData[7] = (byte)  length;
+
                // move the data to the array
-               System.arraycopy(data, 0, versionAndData, 4, data.length);
+               System.arraycopy(data, 0, versionAndData, 8, data.length);
 
                return versionAndData;
        }
@@ -142,14 +148,25 @@ public class SimpleVersionedSerialization {
                checkNotNull(bytes, "bytes");
                checkArgument(bytes.length >= 4, "byte array below minimum 
length (4 bytes)");
 
-               final byte[] dataOnly = Arrays.copyOfRange(bytes, 4, 
bytes.length);
+               final byte[] dataOnly = Arrays.copyOfRange(bytes, 8, 
bytes.length);
                final int version =
                                ((bytes[0] & 0xff) << 24) |
-                                               ((bytes[1] & 0xff) << 16) |
-                                               ((bytes[2] & 0xff) <<  8) |
-                                               (bytes[3] & 0xff);
-
-               return serializer.deserialize(version, dataOnly);
+                               ((bytes[1] & 0xff) << 16) |
+                               ((bytes[2] & 0xff) <<  8) |
+                               (bytes[3] & 0xff);
+
+               final int length =
+                               ((bytes[4] & 0xff) << 24) |
+                               ((bytes[5] & 0xff) << 16) |
+                               ((bytes[6] & 0xff) <<  8) |
+                               (bytes[7] & 0xff);
+
+               if (length == dataOnly.length) {
+                       return serializer.deserialize(version, dataOnly);
+               }
+               else {
+                       throw new IOException("Corrupt data, conflicting 
lengths. Length fields: " + length + ", data: " + dataOnly.length);
+               }
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/1ee705af/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
index 89a6b27..116d37c 100644
--- 
a/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
@@ -26,8 +26,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 
 /**
  * Tests for the {@link SimpleVersionedSerialization} class.
@@ -35,7 +35,7 @@ import static org.junit.Assert.assertNotNull;
 public class SimpleVersionedSerializationTest {
 
        @Test
-       public void testStreamSerializationRoundTrip() throws IOException {
+       public void testSerializationRoundTrip() throws IOException {
                final SimpleVersionedSerializer<String> utfEncoder = new 
SimpleVersionedSerializer<String>() {
 
                        private static final int VERSION = Integer.MAX_VALUE / 
2; // version should occupy many bytes
@@ -60,14 +60,20 @@ public class SimpleVersionedSerializationTest {
                final String testString = "dugfakgs";
                final DataOutputSerializer out = new DataOutputSerializer(32);
                
SimpleVersionedSerialization.writeVersionAndSerialize(utfEncoder, testString, 
out);
+               final byte[] outBytes = out.getCopyOfBuffer();
+
+               final byte[] bytes = 
SimpleVersionedSerialization.writeVersionAndSerialize(utfEncoder, testString);
+               assertArrayEquals(bytes, outBytes);
 
-               final DataInputDeserializer in = new 
DataInputDeserializer(out.getCopyOfBuffer());
+               final DataInputDeserializer in = new 
DataInputDeserializer(bytes);
                final String deserialized = 
SimpleVersionedSerialization.readVersionAndDeSerialize(utfEncoder, in);
+               final String deserializedFromBytes = 
SimpleVersionedSerialization.readVersionAndDeSerialize(utfEncoder, outBytes);
                assertEquals(testString, deserialized);
+               assertEquals(testString, deserializedFromBytes);
        }
 
        @Test
-       public void testStreamSerializeEmpty() throws IOException {
+       public void testSerializeEmpty() throws IOException {
                final String testString = "beeeep!";
 
                SimpleVersionedSerializer<String> emptySerializer = new 
SimpleVersionedSerializer<String>() {
@@ -92,68 +98,15 @@ public class SimpleVersionedSerializationTest {
 
                final DataOutputSerializer out = new DataOutputSerializer(32);
                
SimpleVersionedSerialization.writeVersionAndSerialize(emptySerializer, "abc", 
out);
+               final byte[] outBytes = out.getCopyOfBuffer();
 
-               final DataInputDeserializer in = new 
DataInputDeserializer(out.getCopyOfBuffer());
-               assertEquals(testString, 
SimpleVersionedSerialization.readVersionAndDeSerialize(emptySerializer, in));
-       }
-
-       @Test
-       public void testSerializationRoundTrip() throws IOException {
-               final SimpleVersionedSerializer<String> utfEncoder = new 
SimpleVersionedSerializer<String>() {
-
-                       private static final int VERSION = Integer.MAX_VALUE / 
2; // version should occupy many bytes
-
-                       @Override
-                       public int getVersion() {
-                               return VERSION;
-                       }
-
-                       @Override
-                       public byte[] serialize(String str) throws IOException {
-                               return str.getBytes(StandardCharsets.UTF_8);
-                       }
-
-                       @Override
-                       public String deserialize(int version, byte[] 
serialized) throws IOException {
-                               assertEquals(VERSION, version);
-                               return new String(serialized, 
StandardCharsets.UTF_8);
-                       }
-               };
-
-               final String testString = "dugfakgs";
-               byte[] serialized = 
SimpleVersionedSerialization.writeVersionAndSerialize(utfEncoder, testString);
+               final byte[] bytes = 
SimpleVersionedSerialization.writeVersionAndSerialize(emptySerializer, "abc");
+               assertArrayEquals(bytes, outBytes);
 
-               final String deserialized = 
SimpleVersionedSerialization.readVersionAndDeSerialize(utfEncoder, serialized);
+               final DataInputDeserializer in = new 
DataInputDeserializer(bytes);
+               final String deserialized = 
SimpleVersionedSerialization.readVersionAndDeSerialize(emptySerializer, in);
+               final String deserializedFromBytes = 
SimpleVersionedSerialization.readVersionAndDeSerialize(emptySerializer, 
outBytes);
                assertEquals(testString, deserialized);
-       }
-
-       @Test
-       public void testSerializeEmpty() throws IOException {
-               final String testString = "beeeep!";
-
-               SimpleVersionedSerializer<String> emptySerializer = new 
SimpleVersionedSerializer<String>() {
-
-                       @Override
-                       public int getVersion() {
-                               return 42;
-                       }
-
-                       @Override
-                       public byte[] serialize(String obj) throws IOException {
-                               return new byte[0];
-                       }
-
-                       @Override
-                       public String deserialize(int version, byte[] 
serialized) throws IOException {
-                               assertEquals(42, version);
-                               assertEquals(0, serialized.length);
-                               return testString;
-                       }
-               };
-
-               byte[] serialized = 
SimpleVersionedSerialization.writeVersionAndSerialize(emptySerializer, "abc");
-               assertNotNull(serialized);
-
-               assertEquals(testString, 
SimpleVersionedSerialization.readVersionAndDeSerialize(emptySerializer, 
serialized));
+               assertEquals(testString, deserializedFromBytes);
        }
 }

Reply via email to