[
https://issues.apache.org/jira/browse/FLINK-13369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17026144#comment-17026144
]
Andrew Roberts commented on FLINK-13369:
----------------------------------------
{code:scala}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.annotation.tailrec
object TestJob {
val jobName = "TestJob"
def input(n: Int) = {
@tailrec def inner(current: Int, acc: List[String]): List[String] = {
if (current > n) acc.reverse
else inner(current + 1, current.toString :: acc)
}
inner(0, Nil)
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
job(env, input(10000))
env.execute(jobName)
}
def job[Record: TypeInformation](
env: StreamExecutionEnvironment,
input: List[Record]
) = {
val source = env.addSource(new Src(input))
source.addSink(new Sink[Record])
}
class Sink[Record] extends SinkFunction[Record] {
override def invoke(value: Record, context: SinkFunction.Context[_]) = ()
}
class Src[Record](in: List[Record]) extends SourceFunction[Record] {
def run(ctx: SourceFunction.SourceContext[Record]) =
in.foreach(ctx.collect)
def cancel() = ()
}
}
{code}
> Recursive closure cleaner ends up with stackOverflow in case of circular
> dependency
> -----------------------------------------------------------------------------------
>
> Key: FLINK-13369
> URL: https://issues.apache.org/jira/browse/FLINK-13369
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.8.1, 1.9.0
> Reporter: David Morávek
> Assignee: David Morávek
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.8.2, 1.9.0
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)