damccorm opened a new issue, #19644:
URL: https://github.com/apache/beam/issues/19644

   I read the TFRecord files which is in HDFS will meet error.
    * The single TFRecord file is larger than 3GB.
    * The total size larger than 1TB.
    * Using Beam 2.13.0 **** Flinkrunner 2.13.0 **** Java 1.8, I also test 
2.11.0/2.12.0 with same problem
   
   The dependency jar (in build.gradle):
   ```
   
   dependencies
   
   { // This dependency is found on compile classpath of this component and 
consumers.
   //implementation 'com.google.guava:guava:27.0.1-jre' compile 
'org.apache.beam:beam-sdks-java-core:2.13.0'
   compile 'org.apache.beam:beam-vendor-guava-20_0:0.1' compile 
'org.tensorflow:tensorflow-hadoop:1.13.1'
   compile 'org.apache.beam:beam-runners-direct-java:2.13.0' //implementation 
"org.apache.beam:beam-sdks-java-core:2.13.0"
   compile "org.apache.beam:beam-runners-flink_2.11:2.13.0" compile 
"org.apache.beam:beam-sdks-java-io-hadoop-file-system:2.13.0"
   compile "org.apache.hadoop:hadoop-common:2.7.3" compile 
"org.apache.hadoop:hadoop-client:2.7.3" compile
   "org.apache.hadoop:hadoop-mapreduce-client-core:2.7.3" compile 
"org.tensorflow:proto:1.13.1" compile
   "org.apache.beam:beam-sdks-java-extensions-sketching:2.13.0" // Use JUnit 
test framework testImplementation
   'junit:junit:4.12' }
   ```
   
   The error msg:
   
    
   ```
   
   ------------------------------------------------------------
   
   The program finished with the following
   exception:
   
   org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an
   error.
   at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
   at
   
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
   at
   org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
   at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
   at
   org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
   at
   
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
   at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
   at
   java.security.AccessController.doPrivileged(Native Method)
   at javax.security.auth.Subject.doAs(Subject.java:422)
   at
   
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
   at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
   at
   org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
   Caused by: java.lang.RuntimeException:
   Pipeline execution failed
   at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:116)
   at
   org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
   at
   avazu.data.transform.App.testTfrecordQIYU(App.java:572)
   at avazu.data.transform.App.main(App.java:744)
   at
   sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at
   
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:498)
   at
   
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
   ... 12 more
   Caused
   by: org.apache.flink.client.program.ProgramInvocationException: Job failed. 
(JobID: fe4ce5375efbbb55e56967e4c7a975b2)
   at
   
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
   at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
   at
   org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:475)
   at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
   at
   
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:139)
   at
   org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
   ... 21 more
   Caused by: org.apache.flink.runtime.client.JobExecutionException:
   Job execution failed.
   at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
   at
   
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
   ... 26
   more
   Caused by: java.io.IOException: Mismatch of length mask when reading a 
record. Expected 808268081
   but received 1769712859.
   at org.apache.beam.sdk.io.TFRecordIO$TFRecordCodec.read(TFRecordIO.java:651)
   at
   
org.apache.beam.sdk.io.TFRecordIO$TFRecordSource$TFRecordReader.readNextRecord(TFRecordIO.java:530)
   at
   
org.apache.beam.sdk.io.CompressedSource$CompressedReader.readNextRecord(CompressedSource.java:431)
   at
   
org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:484)
   at 
org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:479)
   at
   
org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249)
   at 
org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeStart(ReaderInvocationUtil.java:51)
   at
   
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:75)
   at
   
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:42)
   at
   
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
   at
   java.lang.Thread.run(Thread.java:745)
   
   ```
   
   How to fix?
    * I have already fix the bug with the following code.
    * I will refine the following code and commit the codes.
   
    
   ```
   
   diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
   index
   96a753a..484a7cb 100644
   --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
   +++
   b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
   @@ -631,8 +631,16 @@ public class
   TFRecordIO {
   int headerBytes = inChannel.read(header);
   if (headerBytes <= 0) {
   return null;
   + }
   else if (headerBytes != HEADER_LEN) {
   + while (header.hasRemaining() && inChannel.read(header) >= 0)
   {}
   + if (header.hasRemaining()) {
   + throw new IOException(String.format(
   + "EOF while reading record
   of length %d. Read only %d bytes. Input might be truncated. Not a valid 
TFRecord. Fewer than 12 bytes.",
   +
   HEADER_LEN, header.position()));
   + }
   + } else {
   +
   }
   - checkState(headerBytes == HEADER_LEN, "Not
   a valid TFRecord. Fewer than 12 bytes.");
   
   header.rewind();
   long length = header.getLong();
   @@ -655,7
   +663,12 @@ public class TFRecordIO {
   }
   
   footer.clear();
   - inChannel.read(footer);
   + while (footer.hasRemaining()
   && inChannel.read(footer) >= 0) {}
   + if (footer.hasRemaining()) {
   + throw new IOException(String.format(
   +
   "EOF while reading record of length %d. Read only %d bytes. Input might be 
truncated. Footer error.",
   +
   FOOTER_LEN, footer.position()));
   + }
   footer.rewind();
   
   int maskedCrc32OfData = footer.getInt();
   
   ```
   
    
   
   Imported from Jira 
[BEAM-7695](https://issues.apache.org/jira/browse/BEAM-7695). Original Jira may 
contain additional context.
   Reported by: silenceli.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to