Acesine commented on a change in pull request #14042:
URL: https://github.com/apache/flink/pull/14042#discussion_r523448225



##########
File path: 
flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java
##########
@@ -62,16 +62,23 @@ public void write(DataOutputView out) throws IOException {
         */
        public final void read(InputStream inputStream) throws IOException {
                byte[] tmp = new byte[VERSIONED_IDENTIFIER.length];
-               inputStream.read(tmp);
+               int totalRead = 0;

Review comment:
       Moved to org.apache.flink.util.IOUtils, added a few UT

##########
File path: 
flink-core/src/test/java/org/apache/flink/core/io/PostVersionedIOReadableWritableTest.java
##########
@@ -35,52 +36,92 @@
 
        @Test
        public void testReadVersioned() throws IOException {
+               byte[] payload = "test-data".getBytes();
+               byte[] serialized = 
serializeWithPostVersionedReadableWritable(payload);
+               byte[] restored = 
restoreWithPostVersionedReadableWritable(serialized, payload.length);
 
-               String payload = "test-data";
-               TestPostVersionedReadableWritable versionedReadableWritable = 
new TestPostVersionedReadableWritable(payload);
+               Assert.assertArrayEquals(payload, restored);
+       }
+
+       @Test
+       public void testReadNonVersioned() throws IOException {
+               byte[] preVersionedPayload = new byte[]{0x00, 0x00, 0x02, 0x33};
+               byte[] serialized = 
serializeWithNonVersionedReadableWritable(preVersionedPayload);
+               byte[] restored = 
restoreWithPostVersionedReadableWritable(serialized, 
preVersionedPayload.length);
+
+               Assert.assertArrayEquals(preVersionedPayload, restored);
+       }
+
+       @Test
+       public void testReadNonVersionedWithLongPayload() throws IOException {
+               byte[] preVersionedPayload = "test-data".getBytes();
+               byte[] serialized = 
serializeWithNonVersionedReadableWritable(preVersionedPayload);
+               byte[] restored = 
restoreWithPostVersionedReadableWritable(serialized, 
preVersionedPayload.length);
+
+               Assert.assertArrayEquals(preVersionedPayload, restored);
+       }
+
+       @Test
+       public void testReadNonVersionedWithShortPayload() throws IOException {

Review comment:
       Thanks for reviewing. Good catch as empty payload test reveals an issue 
around uses of PushbackInputStream. Fixed in new revision.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to