[ 
https://issues.apache.org/jira/browse/FLINK-14037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16932153#comment-16932153
 ] 

liupengcheng commented on FLINK-14037:
--------------------------------------

[~zhuzh] I think you are right, the base class will always be loaded from 
parent loader if it's in _alwaysParentFirstPatterns._ I checked the runtime 
classpath, and find that the _hadoop-mapreduce-client-core (contains 
FileInputFormat class)_package is not in classpath, and 
YARN_APPLICARION_CLASSPATH will not include the hadoop-mapreduce-client-core 
package, so that may explain the case:

Without my PR:

The client will trying to load `TeraInputFormat` first, because it's 
parentFirst, and TeraInputFormat's package is not in the 
_alwaysParentFirstPatterns,_ so it will try to find it in through child loader, 
so was the  _`FileInputFormat`_ , but because the child loader's loadClass will 
delegate the loading to parent loader first, so __ finally the 
`FileInputFormat` will be loaded from parent loader(hadoop packages).

The remote side will load it from user jar(only user jar contains this class).

with my PR:

The client will trying to load `TeraInputFormat` first, because it's 
childFirst, and TeraInputFormat's package is not in the 
_alwaysParentFirstPatterns,_ so it will try to  find it in user jar, thus it 
will also load the `FileInputFormat` with child loader and finally load it from 
user jar(because it's childFirst and user jar contains this class). 

The remote side will load it from user jar(only user jar contains this class).

 

> Deserializing the input/output formats failed: unread block data
> ----------------------------------------------------------------
>
>                 Key: FLINK-14037
>                 URL: https://issues.apache.org/jira/browse/FLINK-14037
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.9.0
>         Environment: flink 1.9.0
> app jar use `flink-shaded-hadoop-2` dependencies to avoid some confilicts
>  
>            Reporter: liupengcheng
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Recently, we encountered the following issue when testing flink 1.9.0:
> {code:java}
> org.apache.flink.client.program.ProgramInvocationException: Could not 
> retrieve the execution result. (JobID: 8ffbc071dda81d6f8005c02be8adde6b)
>       at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
>       at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
>       at 
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:539)
>       at 
> com.github.ehiggs.spark.terasort.FlinkTeraSort$.main(FlinkTeraSort.scala:89)
>       at 
> com.github.ehiggs.spark.terasort.FlinkTeraSort.main(FlinkTeraSort.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>       at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>       at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>       at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>       at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>       at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1886)
>       at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
> submit JobGraph.
>       at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382)
>       at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>       at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>       at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at 
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
>       at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
>       at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal 
> server error., <Exception on server side:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333)
>       at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>       at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>       at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>       at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobExecutionException: Could not set up 
> JobManager
>       at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>       ... 6 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not 
> set up JobManager
>       at 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152)
>       at 
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375)
>       at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>       ... 7 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot 
> initialize task 'DataSource (at 
> org.apache.flink.api.scala.ExecutionEnvironment.createInput(ExecutionEnvironment.scala:390)
>  (org.apache.flink.api.scala.hadoop.mapreduce.HadoopInpu)': Loading the 
> input/output formats failed: 
> org.apache.flink.api.scala.hadoop.mapreduce.HadoopInputFormat@2e179f3e
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
>       at 
> org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
>       at 
> org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
>       at 
> org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176)
>       at 
> org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265)
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>       at 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
>       ... 10 more
> Caused by: java.lang.Exception: Loading the input/output formats failed: 
> org.apache.flink.api.scala.hadoop.mapreduce.HadoopInputFormat@2e179f3e
>       at 
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156)
>       at 
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214)
>       ... 20 more
> Caused by: java.lang.RuntimeException: Deserializing the input/output formats 
> failed: unread block data
>       at 
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68)
>       at 
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153)
>       ... 22 more
> Caused by: java.lang.IllegalStateException: unread block data
>       at 
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>       at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>       at 
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>       at 
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66)
>       ... 23 more
> End of exception on server side>]
>       at 
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
>       at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
>       at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>       at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>       ... 4 more
> {code}
> I checked the classpath:
> {noformat}
> Classpath: 
> lib/flink-table-blink_2.11-1.9.0-mdh1.9.0.0-SNAPSHOT.jar:lib/flink-table_2.11-1.9.0-mdh1.9.0.0-SNAPSHOT.jar:lib/log4j-1.2.17.jar:lib/slf4j-log4j12-1.7.15.jar:log4j.properties:logback.xml:plugins/README.txt:flink.jar:flink-conf.yaml:
>  {OTHER HADOOP CLASSPATH}{noformat}
> Our app code:
> {code:java}
>     val env = ExecutionEnvironment.getExecutionEnvironment
> //    env.getConfig.enableObjectReuse()
>     val dataSet = env.createInput(HadoopInputs.readHadoopFile(
>       new TeraInputFormat, classOf[Array[Byte]], classOf[Array[Byte]], 
> inputFile))
>       .partitionCustom(new FlinkTeraSortPartitioner(new 
> TeraSortPartitioner(partitions)), 0)
>       .sortPartition(0, Order.ASCENDING)
>     val job = Job.getInstance(new JobConf)
>     val outputFormat = new HadoopOutputFormat[Array[Byte], Array[Byte]](
>       new TeraOutputFormat, job)
>     FileOutputFormat.setOutputPath(job, new Path(outputFile))
>     dataSet.output(outputFormat)
>     env.execute("TeraSort")
> {code}
> I'm trying to find out the root cause, now I suspect that we are missing the 
> `flink-hadoop-compatibility_2.11` package under the flink-dist directory.
>  
> And what's more, I think we should provide better useful information to users 
> rather than such messages which are hard to understand.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to