[ 
https://issues.apache.org/jira/browse/FLINK-14037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16926317#comment-16926317
 ] 

Zhu Zhu edited comment on FLINK-14037 at 9/10/19 4:16 AM:
----------------------------------------------------------

Hi [~liupengcheng], the _flink-hadoop-compatibility_ artifact should be used by 
your app in a compile scope so it is part of the user code and no need to be in 
_flink-dis_t.

The root cause of this issue is that some user defined objects failed to be 
deserialized, which may be caused by some class version mismatch. (I suspect 
it's hadoop classes.)

Would you try deploying pre-bundled Hadoop jar 
[e.g.|https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-7.0/flink-shaded-hadoop-2-uber-2.8.3-7.0.jar]
 to your flink-dist/lib to see if it solves your problem? 
[Ref|https://flink.apache.org/downloads.html]

 

 


was (Author: zhuzh):
Hi [~liupengcheng], the _flink-hadoop-compatibility_ artifact should be used by 
your app in a compile scope so it is part of the user code and no need to be in 
_flink-dis_t.

The root cause of this issue is that some user defined objects failed to be 
deserialized, which may be caused by some class version mismatch.

Would you try deploying pre-bundled Hadoop jar 
[e.g.|https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-7.0/flink-shaded-hadoop-2-uber-2.8.3-7.0.jar]
 to your flink-dist/lib to see if it solves your problem? 
[Ref|[https://flink.apache.org/downloads.html]]

 

 

> Deserializing the input/output formats failed: unread block data
> ----------------------------------------------------------------
>
>                 Key: FLINK-14037
>                 URL: https://issues.apache.org/jira/browse/FLINK-14037
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.9.0
>         Environment: flink 1.9.0
> app jar use `flink-shaded-hadoop-2` dependencies to avoid some confilicts
>  
>            Reporter: liupengcheng
>            Priority: Major
>
> Recently, we encountered the following issue when testing flink 1.9.0:
> {code:java}
> org.apache.flink.client.program.ProgramInvocationException: Could not 
> retrieve the execution result. (JobID: 8ffbc071dda81d6f8005c02be8adde6b)
>       at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
>       at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
>       at 
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:539)
>       at 
> com.github.ehiggs.spark.terasort.FlinkTeraSort$.main(FlinkTeraSort.scala:89)
>       at 
> com.github.ehiggs.spark.terasort.FlinkTeraSort.main(FlinkTeraSort.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>       at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>       at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>       at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>       at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>       at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1886)
>       at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
> submit JobGraph.
>       at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382)
>       at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>       at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>       at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at 
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
>       at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
>       at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal 
> server error., <Exception on server side:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333)
>       at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>       at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>       at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>       at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobExecutionException: Could not set up 
> JobManager
>       at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>       ... 6 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not 
> set up JobManager
>       at 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152)
>       at 
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375)
>       at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>       ... 7 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot 
> initialize task 'DataSource (at 
> org.apache.flink.api.scala.ExecutionEnvironment.createInput(ExecutionEnvironment.scala:390)
>  (org.apache.flink.api.scala.hadoop.mapreduce.HadoopInpu)': Loading the 
> input/output formats failed: 
> org.apache.flink.api.scala.hadoop.mapreduce.HadoopInputFormat@2e179f3e
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
>       at 
> org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
>       at 
> org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
>       at 
> org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176)
>       at 
> org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265)
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>       at 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
>       ... 10 more
> Caused by: java.lang.Exception: Loading the input/output formats failed: 
> org.apache.flink.api.scala.hadoop.mapreduce.HadoopInputFormat@2e179f3e
>       at 
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156)
>       at 
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214)
>       ... 20 more
> Caused by: java.lang.RuntimeException: Deserializing the input/output formats 
> failed: unread block data
>       at 
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68)
>       at 
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153)
>       ... 22 more
> Caused by: java.lang.IllegalStateException: unread block data
>       at 
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>       at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>       at 
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>       at 
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66)
>       ... 23 more
> End of exception on server side>]
>       at 
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
>       at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
>       at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>       at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>       ... 4 more
> {code}
> I checked the classpath:
> {noformat}
> Classpath: 
> lib/flink-table-blink_2.11-1.9.0-mdh1.9.0.0-SNAPSHOT.jar:lib/flink-table_2.11-1.9.0-mdh1.9.0.0-SNAPSHOT.jar:lib/log4j-1.2.17.jar:lib/slf4j-log4j12-1.7.15.jar:log4j.properties:logback.xml:plugins/README.txt:flink.jar:flink-conf.yaml:
>  {OTHER HADOOP CLASSPATH}{noformat}
> Our app code:
> {code:java}
>     val env = ExecutionEnvironment.getExecutionEnvironment
> //    env.getConfig.enableObjectReuse()
>     val dataSet = env.createInput(HadoopInputs.readHadoopFile(
>       new TeraInputFormat, classOf[Array[Byte]], classOf[Array[Byte]], 
> inputFile))
>       .partitionCustom(new FlinkTeraSortPartitioner(new 
> TeraSortPartitioner(partitions)), 0)
>       .sortPartition(0, Order.ASCENDING)
>     val job = Job.getInstance(new JobConf)
>     val outputFormat = new HadoopOutputFormat[Array[Byte], Array[Byte]](
>       new TeraOutputFormat, job)
>     FileOutputFormat.setOutputPath(job, new Path(outputFile))
>     dataSet.output(outputFormat)
>     env.execute("TeraSort")
> {code}
> I'm trying to find out the root cause, now I suspect that we are missing the 
> `flink-hadoop-compatibility_2.11` package under the flink-dist directory.
>  
> And what's more, I think we should provide better useful information to users 
> rather than such messages which are hard to understand.
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to