Hi Beam Dev Team,

Ning, Sam, and I are looking to add supporting functionality to the
InteractiveRunner in the case where the underlying_runner is set to
FlinkRunner, and no flink_master is provided, but we are running into a few
issues that we were hoping someone from the dev team would be more familiar
with.

To provide some context, we are looking to automatically create a Dataproc
cluster with all configurations necessary for the FlinkRunner to run, and
we are currently able to get jobs to show up on the Flink dashboard
corresponding to the used cluster, which is available through the YARN
ResourceManager interface.

Currently, any pipeline that is being run on a provided Dataproc cluster
flink_master does not succeed, the job will show up on the Flink dashboard,
but fail after a few seconds. It seems that the recurring issues are the
'NoClassDefFoundError' error or the 'ServiceConfigurationError' error, as
shown in the examples below. These two errors show up using the same
configurations, with the NoClassDefFoundError seeming more common.

NoClassDefFoundError:
[image: image.png]

ServiceConfigurationError:
[image: image.png]

I have also attached the corresponding root exception logs from the Flink
dashboard.

The clusters are being configured with apache-beam[gcp]==2.35.0 installed,
a Dataproc image with Flink version 1.12.5 installed, and the optional
Docker and Flink Dataproc cluster components. The notebook that is being
used for these runs is also using Apache Beam 2.35.0. Is there additional
setup required for clusters to support Flink?

Are these issues that have been seen before?

Thank you in advance,
Victor
2022-02-25 19:10:56
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
        at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446)
        at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at akka.actor.Actor.aroundReceive(Actor.scala:517)
        at akka.actor.Actor.aroundReceive$(Actor.scala:515)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.ServiceConfigurationError: 
com.fasterxml.jackson.databind.Module: Provider 
com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule not a subtype
        at java.util.ServiceLoader.fail(ServiceLoader.java:239)
        at java.util.ServiceLoader.access$300(ServiceLoader.java:185)
        at 
java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376)
        at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
        at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
        at 
com.fasterxml.jackson.databind.ObjectMapper.findModules(ObjectMapper.java:1105)
        at 
org.apache.beam.runners.core.construction.SerializablePipelineOptions.<clinit>(SerializablePipelineOptions.java:38)
        at java.io.ObjectStreamClass.hasStaticInitializer(Native Method)
        at 
java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1955)
        at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79)
        at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:275)
        at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:273)
        at java.security.AccessController.doPrivileged(Native Method)
        at 
java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:272)
        at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:694)
        at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2005)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1852)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
        at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
        at 
org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:78)
        at 
org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1246)
        at 
org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:599)
        at 
org.apache.flink.runtime.operators.BatchTask.getOutputCollector(BatchTask.java:1362)
        at 
org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1477)
        at 
org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1132)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:245)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
        at java.lang.Thread.run(Thread.java:750)
2022-02-25 19:17:47
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
        at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446)
        at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at akka.actor.Actor.aroundReceive(Actor.scala:517)
        at akka.actor.Actor.aroundReceive$(Actor.scala:515)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.beam.runners.core.construction.SerializablePipelineOptions
        at java.io.ObjectStreamClass.hasStaticInitializer(Native Method)
        at 
java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1955)
        at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79)
        at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:275)
        at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:273)
        at java.security.AccessController.doPrivileged(Native Method)
        at 
java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:272)
        at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:694)
        at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2005)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1852)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
        at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
        at 
org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:78)
        at 
org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1246)
        at 
org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:599)
        at 
org.apache.flink.runtime.operators.DataSourceTask.initInputFormat(DataSourceTask.java:324)
        at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
        at java.lang.Thread.run(Thread.java:750)

Reply via email to