[
https://issues.apache.org/jira/browse/BEAM-6289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16730390#comment-16730390
]
Maximilian Michels commented on BEAM-6289:
------------------------------------------
Thanks for the logs and for the great instructions on how to reproduce this.
Here's what happens:
The Cassandra source takes too long to generate the splits which is done at the
JobManager when the Beam job is sent to the cluster. This lets the job
submission time out because it does not receive the acknowledgement from the
JobManager until the splits are done. The timeout then results in a shutdown of
the local executor. Of course this wouldn't happen with a remote Flink cluster.
Two possibilities:
1. Increase the Flink setting for the Akka timeout, i.e. {{akka.ask.timeout: 1
minute}}. You will have to set the environment variable {{FLINK_CONF_DIR}} with
the directory containing the "flink-conf.yaml". Unfortunately, this will only
work in the latest master and upcoming release, as the config wasn't previously
loaded for local execution.
2. Change the splitting logic of the {{SourceInputFormat}} to perform early
splitting before job submission like we do in the streaming mode. This could
make sense for the next release.
For your test, you could consider adding the snapshot repo:
https://repository.apache.org/content/repositories/snapshots/ and using Beam
2.10.0.
> Running a join on two Cassandra tables using FlinkRunner fails
> --------------------------------------------------------------
>
> Key: BEAM-6289
> URL: https://issues.apache.org/jira/browse/BEAM-6289
> Project: Beam
> Issue Type: Bug
> Components: io-java-cassandra, runner-flink
> Affects Versions: 2.8.0, 2.9.0
> Environment: Tested on Ubuntu 18
> Beam 2.8
> Tested with Flink:
> 1) [local]
> 2) Cluster inside a K8S cluster on minikube
> 3) Cluster inside a K8S cluster on GCP
> Tested using Cassandra [cqlsh 5.0.1 | Cassandra 3.11.3 | CQL spec 3.4.4 |
> Native protocol v4]:
> 1) In a local container
> 2) Cluster inside a K8S cluster on minikube
> 3) Cluster inside a K8S cluster on GCP
> Reporter: Shahar Frank
> Assignee: Maximilian Michels
> Priority: Critical
> Labels: FlinkRunner, beam, bug, cassandra, flink, join
> Attachments: direct_runner_build.log, flink_runner_build.log
>
>
> Can't make a simple join on two Cassandra tables when using FlinkRunner.
> The same code works with a DirectRunner fails when used with FlinkRunner
> giving these (as well as many other) errors:
> {code:java}
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatchere1f5abe7-6299-43ea-9182-24a2193e078f#-1757043920]]
> after [10000 ms]. Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
> at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>
> The code can be found [here|https://github.com/srfrnk/beam-playground]
> Steps to reproduce:
> # Clone the repo to a linux (I;m on Ubuntu 18 but any *nix system would
> probably work - i.e. repl.it)
> # Follow the README to set up a Cassandra container + schema
> # Run with
> {code}
> gradle --console=plain join-from-cassandra -Drunner=flink > output/build.log
> 2>&1{code}
> to use FlinkRunner. See error in log at ./output/build.log
> # Run with
> {code}
> gradle --console=plain join-from-cassandra -Drunner=direct > output/build.log
> 2>&1{code}
> to use DirectRunner. See error in log at ./output/build.log
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)