[ 
https://issues.apache.org/jira/browse/BEAM-6289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17008236#comment-17008236
 ] 

sophie commented on BEAM-6289:
------------------------------

2. Change the splitting logic of the SourceInputFormat to perform early 
splitting before job submission like we do in the streaming mode. This could 
make sense for the next release.

How about the splitting logic of the SourceInputFormat now ? any progress ? I 
don`t see any related issus. Or is there any other long-term fix ? 

> Running a join on two Cassandra tables using FlinkRunner fails
> --------------------------------------------------------------
>
>                 Key: BEAM-6289
>                 URL: https://issues.apache.org/jira/browse/BEAM-6289
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-cassandra, runner-flink
>    Affects Versions: 2.8.0, 2.9.0
>         Environment: Tested on Ubuntu 18
> Beam 2.8
> Tested with Flink:
> 1) [local]
> 2) Cluster inside a K8S cluster on minikube
> 3) Cluster inside a K8S cluster on GCP
> Tested using Cassandra [cqlsh 5.0.1 | Cassandra 3.11.3 | CQL spec 3.4.4 | 
> Native protocol v4]:
> 1) In a local container
> 2) Cluster inside a K8S cluster on minikube
> 3) Cluster inside a K8S cluster on GCP
>            Reporter: Shahar Frank
>            Assignee: Maximilian Michels
>            Priority: Critical
>              Labels: FlinkRunner, beam, bug, cassandra, flink, join
>             Fix For: Not applicable
>
>         Attachments: direct_runner_build.log, flink_runner_build.log
>
>
> Can't make a simple join on two Cassandra tables when using FlinkRunner.
> The same code works with a DirectRunner fails when used with FlinkRunner 
> giving these (as well as many other) errors:
> {code:java}
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/dispatchere1f5abe7-6299-43ea-9182-24a2193e078f#-1757043920]]
>  after [10000 ms]. Sender[null] sent message of type 
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>     at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>     at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>     at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>     at 
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>     at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>     at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>     at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>     at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>     at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>     at java.lang.Thread.run(Thread.java:748)
> {code}
>  
> The code can be found [here|https://github.com/srfrnk/beam-playground]
> Steps to reproduce:
>  # Clone the repo to a linux (I;m on Ubuntu 18 but any *nix system would 
> probably work - i.e. repl.it)
>  # Follow the README to set up a Cassandra container + schema
>  # Run with 
> {code}
> gradle --console=plain join-from-cassandra -Drunner=flink > output/build.log 
> 2>&1{code}
> to use FlinkRunner. See error in log at ./output/build.log
>  # Run with 
> {code}
> gradle --console=plain join-from-cassandra -Drunner=direct > output/build.log 
> 2>&1{code}
> to use DirectRunner. See error in log at ./output/build.log



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to