[ 
https://issues.apache.org/jira/browse/BEAM-7695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

LI HAO updated BEAM-7695:
-------------------------
    Description: 
I read the TFRecord files which is in HDFS will meet error.
 * The single TFRecord file is larger than 3GB.
 * The total size larger than 1TB.
 * Using Beam 2.13.0 + Flinkrunner 2.13.0 + Java 1.8, I also test 2.11.0/2.12.0 
with same problem

The dependency jar (in build.gradle):
{code:java}
dependencies

{ // This dependency is found on compile classpath of this component and 
consumers. //implementation 'com.google.guava:guava:27.0.1-jre' compile 
'org.apache.beam:beam-sdks-java-core:2.13.0' compile 
'org.apache.beam:beam-vendor-guava-20_0:0.1' compile 
'org.tensorflow:tensorflow-hadoop:1.13.1' compile 
'org.apache.beam:beam-runners-direct-java:2.13.0' //implementation 
"org.apache.beam:beam-sdks-java-core:2.13.0" compile 
"org.apache.beam:beam-runners-flink_2.11:2.13.0" compile 
"org.apache.beam:beam-sdks-java-io-hadoop-file-system:2.13.0" compile 
"org.apache.hadoop:hadoop-common:2.7.3" compile 
"org.apache.hadoop:hadoop-client:2.7.3" compile 
"org.apache.hadoop:hadoop-mapreduce-client-core:2.7.3" compile 
"org.tensorflow:proto:1.13.1" compile 
"org.apache.beam:beam-sdks-java-extensions-sketching:2.13.0" // Use JUnit test 
framework testImplementation 'junit:junit:4.12' }{code}
The error msg:

 
{code:java}
------------------------------------------------------------

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.lang.RuntimeException: Pipeline execution failed
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:116)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
at avazu.data.transform.App.testTfrecordQIYU(App.java:572)
at avazu.data.transform.App.main(App.java:744)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
... 12 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed. (JobID: fe4ce5375efbbb55e56967e4c7a975b2)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:475)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
at 
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:139)
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 26 more
Caused by: java.io.IOException: Mismatch of length mask when reading a record. 
Expected 808268081 but received 1769712859.
at org.apache.beam.sdk.io.TFRecordIO$TFRecordCodec.read(TFRecordIO.java:651)
at 
org.apache.beam.sdk.io.TFRecordIO$TFRecordSource$TFRecordReader.readNextRecord(TFRecordIO.java:530)
at 
org.apache.beam.sdk.io.CompressedSource$CompressedReader.readNextRecord(CompressedSource.java:431)
at 
org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:484)
at 
org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:479)
at 
org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249)
at 
org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeStart(ReaderInvocationUtil.java:51)
at 
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:75)
at 
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:42)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:745)
{code}

  was:
I read the TFRecord files which is in HDFS will meet error.
 * The single TFRecord file is larger than 3GB.
 * The total size larger than 1TB.
 * Using Beam 2.13.0 + Flinkrunner 2.13.0, I also test 2.11.0/2.12.0

The dependency jar:

``` build.gradle

dependencies {
 // This dependency is found on compile classpath of this component and 
consumers.
 //implementation 'com.google.guava:guava:27.0.1-jre'
 compile 'org.apache.beam:beam-sdks-java-core:2.13.0'
 compile 'org.apache.beam:beam-vendor-guava-20_0:0.1'
 compile 'org.tensorflow:tensorflow-hadoop:1.13.1'
 compile 'org.apache.beam:beam-runners-direct-java:2.13.0'
 //implementation "org.apache.beam:beam-sdks-java-core:2.13.0"
 compile "org.apache.beam:beam-runners-flink_2.11:2.13.0"
 compile "org.apache.beam:beam-sdks-java-io-hadoop-file-system:2.13.0"
 compile "org.apache.hadoop:hadoop-common:2.7.3"
 compile "org.apache.hadoop:hadoop-client:2.7.3"
 compile "org.apache.hadoop:hadoop-mapreduce-client-core:2.7.3"
 compile "org.tensorflow:proto:1.13.1"
 compile "org.apache.beam:beam-sdks-java-extensions-sketching:2.13.0"


 // Use JUnit test framework
 testImplementation 'junit:junit:4.12'
}

```

The error msg:

```

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
 at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
 at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
 at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
 at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
 at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.lang.RuntimeException: Pipeline execution failed
 at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:116)
 at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
 at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
 at avazu.data.transform.App.testTfrecordQIYU(App.java:572)
 at avazu.data.transform.App.main(App.java:744)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
 ... 12 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed. (JobID: fe4ce5375efbbb55e56967e4c7a975b2)
 at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:475)
 at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
 at 
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:139)
 at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
 ... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
 at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
 at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
 ... 26 more
Caused by: java.io.IOException: Mismatch of length mask when reading a record. 
Expected 808268081 but received 1769712859.
 at org.apache.beam.sdk.io.TFRecordIO$TFRecordCodec.read(TFRecordIO.java:651)
 at 
org.apache.beam.sdk.io.TFRecordIO$TFRecordSource$TFRecordReader.readNextRecord(TFRecordIO.java:530)
 at 
org.apache.beam.sdk.io.CompressedSource$CompressedReader.readNextRecord(CompressedSource.java:431)
 at 
org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:484)
 at 
org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:479)
 at 
org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249)
 at 
org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeStart(ReaderInvocationUtil.java:51)
 at 
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:75)
 at 
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:42)
 at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
 at java.lang.Thread.run(Thread.java:745)

```


> Read TFRecord Files from hdfs will meet exception if file size is large
> -----------------------------------------------------------------------
>
>                 Key: BEAM-7695
>                 URL: https://issues.apache.org/jira/browse/BEAM-7695
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-tfrecord
>    Affects Versions: 2.2.0, 2.3.0, 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0, 2.9.0, 
> 2.10.0, 2.11.0, 2.12.0, 2.13.0
>            Reporter: LI HAO
>            Priority: Blocker
>
> I read the TFRecord files which is in HDFS will meet error.
>  * The single TFRecord file is larger than 3GB.
>  * The total size larger than 1TB.
>  * Using Beam 2.13.0 + Flinkrunner 2.13.0 + Java 1.8, I also test 
> 2.11.0/2.12.0 with same problem
> The dependency jar (in build.gradle):
> {code:java}
> dependencies
> { // This dependency is found on compile classpath of this component and 
> consumers. //implementation 'com.google.guava:guava:27.0.1-jre' compile 
> 'org.apache.beam:beam-sdks-java-core:2.13.0' compile 
> 'org.apache.beam:beam-vendor-guava-20_0:0.1' compile 
> 'org.tensorflow:tensorflow-hadoop:1.13.1' compile 
> 'org.apache.beam:beam-runners-direct-java:2.13.0' //implementation 
> "org.apache.beam:beam-sdks-java-core:2.13.0" compile 
> "org.apache.beam:beam-runners-flink_2.11:2.13.0" compile 
> "org.apache.beam:beam-sdks-java-io-hadoop-file-system:2.13.0" compile 
> "org.apache.hadoop:hadoop-common:2.7.3" compile 
> "org.apache.hadoop:hadoop-client:2.7.3" compile 
> "org.apache.hadoop:hadoop-mapreduce-client-core:2.7.3" compile 
> "org.tensorflow:proto:1.13.1" compile 
> "org.apache.beam:beam-sdks-java-extensions-sketching:2.13.0" // Use JUnit 
> test framework testImplementation 'junit:junit:4.12' }{code}
> The error msg:
>  
> {code:java}
> ------------------------------------------------------------
> The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: java.lang.RuntimeException: Pipeline execution failed
> at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:116)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
> at avazu.data.transform.App.testTfrecordQIYU(App.java:572)
> at avazu.data.transform.App.main(App.java:744)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> ... 12 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
> failed. (JobID: fe4ce5375efbbb55e56967e4c7a975b2)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:475)
> at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
> at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:139)
> at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
> ... 21 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> ... 26 more
> Caused by: java.io.IOException: Mismatch of length mask when reading a 
> record. Expected 808268081 but received 1769712859.
> at org.apache.beam.sdk.io.TFRecordIO$TFRecordCodec.read(TFRecordIO.java:651)
> at 
> org.apache.beam.sdk.io.TFRecordIO$TFRecordSource$TFRecordReader.readNextRecord(TFRecordIO.java:530)
> at 
> org.apache.beam.sdk.io.CompressedSource$CompressedReader.readNextRecord(CompressedSource.java:431)
> at 
> org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:484)
> at 
> org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:479)
> at 
> org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249)
> at 
> org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeStart(ReaderInvocationUtil.java:51)
> at 
> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:75)
> at 
> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:42)
> at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to