[
https://issues.apache.org/jira/browse/BEAM-2943?focusedWorklogId=253663&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-253663
]
ASF GitHub Bot logged work on BEAM-2943:
----------------------------------------
Author: ASF GitHub Bot
Created on: 04/Jun/19 10:11
Start Date: 04/Jun/19 10:11
Worklog Time Spent: 10m
Work Description: lgajowy commented on pull request #8744: [BEAM-2943]
Check existence of to-be-staged files in PipelineResources
URL: https://github.com/apache/beam/pull/8744
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 253663)
Time Spent: 40m (was: 0.5h)
> Non-existing fileToStage results in ClassNotFoundException
> ----------------------------------------------------------
>
> Key: BEAM-2943
> URL: https://issues.apache.org/jira/browse/BEAM-2943
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.1.0
> Environment: Debian 9.1 / 4.9.0-3-amd64 #1 SMP Debian 4.9.30-2+deb9u3
> (2017-08-06) x86_64 GNU/Linux
> Reporter: Guenther Grill
> Assignee: Maximilian Michels
> Priority: Minor
> Labels: flink
> Fix For: Not applicable
>
> Time Spent: 40m
> Remaining Estimate: 0h
>
> Hi,
> I followed the guide https://beam.apache.org/get-started/quickstart-java/ to
> run beam program within a flink cluster.
> The output of the dependency-command is:
> {code}
> mvn dependency:tree -Pflink-runner |grep flink
>
> [INFO] \- org.apache.beam:beam-runners-flink_2.10:jar:2.1.0:runtime
> [INFO] +- org.apache.flink:flink-clients_2.10:jar:1.3.0:runtime
> [INFO] | +- org.apache.flink:flink-optimizer_2.10:jar:1.3.0:runtime
> [INFO] | \- org.apache.flink:force-shading:jar:1.3.0:runtime
> [INFO] +- org.apache.flink:flink-core:jar:1.3.0:runtime
> [INFO] | +- org.apache.flink:flink-annotations:jar:1.3.0:runtime
> [INFO] +- org.apache.flink:flink-metrics-core:jar:1.3.0:runtime
> [INFO] +- org.apache.flink:flink-java:jar:1.3.0:runtime
> [INFO] | +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.0:runtime
> [INFO] +- org.apache.flink:flink-runtime_2.10:jar:1.3.0:runtime
> [INFO] +- org.apache.flink:flink-streaming-java_2.10:jar:1.3.0:runtime
> {code}
> Then I started the flink cluster with the correct version with docker-compose
> {code}
> export JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
> export FLINK_DOCKER_IMAGE_NAME=flink:1.3.0-hadoop27-scala_2.10
> docker-compose up -d
> {code}
> The compose file looks like this:
> {code}
> version: '3.3'
> services:
> jobmanager:
> image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
> expose:
> - "6123"
> ports:
> - "6123:6123"
> - "8081:8081"
> volumes:
> - /tmp:/tmp
> command: jobmanager
> environment:
> - JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
> taskmanager:
> image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
> expose:
> - "6121"
> - "6122"
> depends_on:
> - jobmanager
> command: taskmanager
> environment:
> - JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
> {code}
> The flink cluster works, but when I execute
> {code}
> mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
> -Pflink-runner \
> -Dexec.args="--runner=FlinkRunner \
> --inputFile=pom.xml \
> --output=/path/to/counts \
> --flinkMaster=[HOST_IP]:6123 \
> --filesToStage=target/word-count-beam-bundled-0.1.jar"
> {code}
> I get:
> {code}
> 2017-09-12 06:39:57,226 INFO org.apache.flink.runtime.jobmanager.JobManager
> - Submitting job a913f922506053e65e732eeb8336b3bd
> (wordcount-grg-0912063956-c7ea6199).
> 2017-09-12 06:39:57,227 INFO org.apache.flink.runtime.jobmanager.JobManager
> - Using restart strategy NoRestartStrategy for
> a913f922506053e65e732eeb8336b3bd.
> 2017-09-12 06:39:57,227 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job recovers
> via failover strategy: full graph restart
> 2017-09-12 06:39:57,229 INFO org.apache.flink.runtime.jobmanager.JobManager
> - Running initialization on master for job
> wordcount-grg-0912063956-c7ea6199 (a913f922506053e65e732eeb8336b3bd).
> 2017-09-12 06:39:57,230 ERROR org.apache.flink.runtime.jobmanager.JobManager
> - Failed to submit job a913f922506053e65e732eeb8336b3bd
> (wordcount-grg-0912063956-c7ea6199)
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task
> 'DataSource (at Read(CompressedSource)
> (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))':
> Deserializing the InputFormat
> (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a)
> failed: Could not read the user code wrapper:
> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:153)
> at
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: Deserializing the InputFormat
> (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a)
> failed: Could not read the user code wrapper:
> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
> at
> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:66)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:150)
> ... 24 more
> Caused by:
> org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could
> not read the user code wrapper:
> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
> at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
> at
> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:63)
> ... 25 more
> Caused by: java.lang.ClassNotFoundException:
> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
> at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
> ... 26 more
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)