[ 
https://issues.apache.org/jira/browse/BEAM-9743?focusedWorklogId=420719&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-420719
 ]

ASF GitHub Bot logged work on BEAM-9743:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Apr/20 09:28
            Start Date: 11/Apr/20 09:28
    Worklog Time Spent: 10m 
      Work Description: lukemin89 commented on pull request #11397: [BEAM-9743] 
Fix TFRecordCodec to try harder to read/write
URL: https://github.com/apache/beam/pull/11397#discussion_r407041130
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
 ##########
 @@ -669,34 +670,31 @@ public int recordLength(byte[] data) {
 
     public @Nullable byte[] read(ReadableByteChannel inChannel) throws 
IOException {
       header.clear();
-      int headerBytes = inChannel.read(header);
-      if (headerBytes <= 0) {
+      int firstRead = read(inChannel, header);
+      if (firstRead == 0) {
         return null;
       }
-      checkState(headerBytes == HEADER_LEN, "Not a valid TFRecord. Fewer than 
12 bytes.");
 
       header.rewind();
-      long length = header.getLong();
-      long lengthHash = hashLong(length);
+      long length64 = header.getLong();
+      long lengthHash = hashLong(length64);
       int maskedCrc32OfLength = header.getInt();
       if (lengthHash != maskedCrc32OfLength) {
         throw new IOException(
             String.format(
                 "Mismatch of length mask when reading a record. Expected %d 
but received %d.",
                 maskedCrc32OfLength, lengthHash));
       }
-
-      ByteBuffer data = ByteBuffer.allocate((int) length);
-      while (data.hasRemaining() && inChannel.read(data) >= 0) {}
-      if (data.hasRemaining()) {
-        throw new IOException(
-            String.format(
-                "EOF while reading record of length %d. Read only %d bytes. 
Input might be truncated.",
-                length, data.position()));
+      int length = (int) length64;
+      if (length != length64) {
+        throw new IOException(String.format("length overflow %d", length64));
       }
 
 Review comment:
   minor, yet required check for integer overflow. I'm not sure if this is 
viable, but if I remember correctly, other languages can make/write more than 
2GB. (Maybe java can do it with a direct buffer?)
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 420719)
    Time Spent: 20m  (was: 10m)

> TFRecordCodec not attempt to fully read/write
> ---------------------------------------------
>
>                 Key: BEAM-9743
>                 URL: https://issues.apache.org/jira/browse/BEAM-9743
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>            Reporter: Kyoungha Min
>            Assignee: Kyoungha Min
>            Priority: Critical
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Seems like it only happens with Zstd compression (or any other picky input 
> stream that refuse to read fully). Zstd seems very picky at giving out data.
> The parts with the issue are
> [https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java#L672]
> [https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java#L699]
>  
> And not so problem within the beam application (As all WritableByteChannels 
> in beam-java-sdk-core are backed by some OutputStream), but still not 
> following the WritableByteChannel specification, 
> [https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java#L720-L727]
>  
> ReadableByteChannel/WritableByteChannel Javadoc specifies that they are not 
> required to read/write fully, and can refuse to read/write time to time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to