[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16153521#comment-16153521
 ] 

Mikhail Lipkovich edited comment on FLINK-7567 at 9/5/17 11:49 AM:
-------------------------------------------------------------------

Hi Peter,
this error occurs due to this task 
https://issues.apache.org/jira/browse/FLINK-2398
It's not allowed now to have input and feedback streams of different 
parallelism level. What you can do in your particular example is to change 
parallelism of the feedback stream:

{code:java}
it => {
      (it.filter(_ > 0).map(_ - 1).setParallelism(1), it.filter(_ > 0).map(_ => 
'x')
{code}

Probably we should at least document that parameter keepPartitioning of 
DataStream.iterate is ignored



was (Author: mlipkovich):
Hi Peter,
this error occurs due to this task 
https://issues.apache.org/jira/browse/FLINK-2398
It's not allowed now to have input and feedback streams of different parallel 
level. What you can do in your particular example is to change parallelism of 
the feedback stream:

{code:java}
it => {
      (it.filter(_ > 0).map(_ - 1).setParallelism(1), it.filter(_ > 0).map(_ => 
'x')
{code}

Probably we should at least document that parameter keepPartitioning of 
DataStream.iterate is ignored


> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-7567
>                 URL: https://issues.apache.org/jira/browse/FLINK-7567
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.3.2
>         Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>            Reporter: Peter Ertl
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     // do something silly just do get iteration going ...
>     val result = env.fromElements(1, 2, 3).iterate(it => {
>       (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
>     })
>     result.print()
>     env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>       at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>       at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>       at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>       at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>       at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>       at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>       at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>       at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>       at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>       at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>       at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to