[ 
https://issues.apache.org/jira/browse/BEAM-9743?focusedWorklogId=422857&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-422857
 ]

ASF GitHub Bot logged work on BEAM-9743:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Apr/20 17:16
            Start Date: 15/Apr/20 17:16
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on pull request #11397: [BEAM-9743] 
Fix TFRecordCodec to try harder to read/write
URL: https://github.com/apache/beam/pull/11397#discussion_r408996520
 
 

 ##########
 File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
 ##########
 @@ -440,4 +456,115 @@ public void processElement(ProcessContext c) {
       c.output(c.element().getBytes(Charsets.UTF_8));
     }
   }
+
+  static boolean maybeThisTime() {
+    return ThreadLocalRandom.current().nextBoolean();
+  }
+
+  static class PickyReadChannel extends FilterInputStream implements 
ReadableByteChannel {
+    protected PickyReadChannel(InputStream in) {
+      super(in);
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int read(ByteBuffer dst) throws IOException {
+      if (!maybeThisTime() || !dst.hasRemaining()) {
+        return 0;
+      }
+      int n = read();
+      if (n == -1) {
+        return -1;
+      }
+      dst.put((byte) n);
+      return 1;
+    }
+
+    @Override
+    public boolean isOpen() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  static class PickyWriteChannel extends FilterOutputStream implements 
WritableByteChannel {
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    public PickyWriteChannel(OutputStream out) {
+      super(out);
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+      if (!maybeThisTime() || !src.hasRemaining()) {
+        return 0;
+      }
+      write(src.get());
+      return 1;
+    }
+
+    @Override
+    public boolean isOpen() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  @Test
+  public void testReadFully() throws IOException {
+    byte[] data = "Hello World".getBytes(StandardCharsets.UTF_8);
+    ReadableByteChannel chan = new PickyReadChannel(new 
ByteArrayInputStream(data));
+
+    ByteBuffer buffer = ByteBuffer.allocate(data.length);
+    TFRecordCodec.readFully(chan, buffer);
+
+    assertArrayEquals(data, buffer.array());
+  }
+
+  @Test(expected = IOException.class)
 
 Review comment:
   You want to ensure that the exception is because of having a truncated read 
so it is useful to check that the message contains a part of what you think it 
should have such as `expected` and `but got`. Check out 
https://junit.org/junit4/javadoc/4.12/org/junit/rules/ExpectedException.html or 
anything like it.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 422857)
    Time Spent: 2h 20m  (was: 2h 10m)

> TFRecordCodec not attempt to fully read/write
> ---------------------------------------------
>
>                 Key: BEAM-9743
>                 URL: https://issues.apache.org/jira/browse/BEAM-9743
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-tfrecord, sdk-java-core
>            Reporter: Kyoungha Min
>            Assignee: Kyoungha Min
>            Priority: Critical
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> The same issue has been pointed out and the issues were marked resolved. But 
> they were still remaining parts....
> https://issues.apache.org/jira/browse/BEAM-5412?jql=text%20~%20%22tfrecord%22
>  
> Issue # 1: TFRecordCodec only tries once to read the header/footer. This is 
> likely to fail around the end of channel buffer.  
> Issue # 2: (minor) TFRecordCodec currently does not checks how much it 
> writes. 
>  
> Seems like it only happens with Zstd compression (or any other picky input 
> stream that refuse to read fully). ZstdInputStream seems very picky at giving 
> out data.
> The parts with the issue are
> [https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java#L672]
> [https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java#L699]
>  
> And not so problem within the beam application (As all (or most) of 
> WritableByteChannels in beam-java-sdk-core are backed by some OutputStream), 
> but still not following the WritableByteChannel specification, 
> [https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java#L720-L727]
>  
> ReadableByteChannel/WritableByteChannel Javadoc specifies that they are not 
> required to read/write fully, and can refuse to read/write time to time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to