[ https://issues.apache.org/jira/browse/FLINK-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15604566#comment-15604566 ]
Stefan Richter commented on FLINK-4883: --------------------------------------- Sure, here is the example from the mailing list: {code} import org.apache.flink.api.scala._ import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.state.memory.MemoryStateBackend import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.util.Collector import scala.collection.mutable.ArrayBuffer object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] { val buffer = ArrayBuffer.empty[Long] @transient var state: ValueState[String] = _ override def open(parameters: Configuration): Unit = { super.open(parameters) state = getRuntimeContext.getState(new ValueStateDescriptor[String]("state-descriptor", classOf[String], "")) } override def flatMap2(value: String, out: Collector[(Long, String)]): Unit = { state.update(value) } override def flatMap1(value: Long, out: Collector[(Long, String)]): Unit = { buffer += value if (state.value() != "") { for (elem ← buffer) { out.collect((elem, state.value())) } buffer.clear() } } } object StreamingPipeline { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(30 * 1000, CheckpointingMode.EXACTLY_ONCE) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStateBackend(new MemoryStateBackend()) val pipeline1 = env.generateSequence(0, 1000) val pipeline2 = env.fromElements("even", "odd") pipeline1.connect(pipeline2) .keyBy( elem ⇒ elem % 2 == 0, elem ⇒ elem == "even" ).flatMap(FlatMapper) .print() env.execute("Example") } } {code} > Prevent UDFs implementations through Scala singleton objects > ------------------------------------------------------------ > > Key: FLINK-4883 > URL: https://issues.apache.org/jira/browse/FLINK-4883 > Project: Flink > Issue Type: Bug > Reporter: Stefan Richter > Assignee: Renkai Ge > > Currently, user can create and use UDFs in Scala like this: > {code} > object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] > { > ... > } > {code} > However, this leads to problems as the UDF is now a singleton that Flink > could use across several operator instances, which leads to job failures. We > should detect and prevent the usage of singleton UDFs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)