[ 
https://issues.apache.org/jira/browse/FLINK-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15604566#comment-15604566
 ] 

Stefan Richter commented on FLINK-4883:
---------------------------------------

Sure, here is the example from the mailing list:

{code}
import org.apache.flink.api.scala._
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer

object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] {

  val buffer = ArrayBuffer.empty[Long]

  @transient var state: ValueState[String] = _

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    state = getRuntimeContext.getState(new 
ValueStateDescriptor[String]("state-descriptor", classOf[String], ""))
  }

  override def flatMap2(value: String, out: Collector[(Long, String)]): Unit = {
    state.update(value)
  }

  override def flatMap1(value: Long, out: Collector[(Long, String)]): Unit = {
    buffer += value

    if (state.value() != "") {
      for (elem ← buffer) {
        out.collect((elem, state.value()))
      }

      buffer.clear()
    }
  }
}

object StreamingPipeline {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.enableCheckpointing(30 * 1000, CheckpointingMode.EXACTLY_ONCE)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setStateBackend(new MemoryStateBackend())

    val pipeline1 = env.generateSequence(0, 1000)

    val pipeline2 = env.fromElements("even", "odd")

    pipeline1.connect(pipeline2)
      .keyBy(
        elem ⇒ elem % 2 == 0,
        elem ⇒ elem == "even"
      ).flatMap(FlatMapper)
      .print()

    env.execute("Example")
  }
}
{code}

> Prevent UDFs implementations through Scala singleton objects
> ------------------------------------------------------------
>
>                 Key: FLINK-4883
>                 URL: https://issues.apache.org/jira/browse/FLINK-4883
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Stefan Richter
>            Assignee: Renkai Ge
>
> Currently, user can create and use UDFs in Scala like this:
> {code}
> object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] 
> {
> ...
> }
> {code}
> However, this leads to problems as the UDF is now a singleton that Flink 
> could use across several operator instances, which leads to job failures. We 
> should detect and prevent the usage of singleton UDFs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to