[
https://issues.apache.org/jira/browse/FLINK-6298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15967340#comment-15967340
]
ASF GitHub Bot commented on FLINK-6298:
---------------------------------------
GitHub user wenlong88 opened a pull request:
https://github.com/apache/flink/pull/3716
[FLINK-6298]Local execution is not setting RuntimeContext for
RichOutputFormat
call set RuntimeContext OutputFormat when the OutputFormat is
RichOutputFormat
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/wenlong88/flink jira-6298
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3716.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3716
----
commit 050d3262b2882cb6ca1a6b6a23ad7d49f76b1e94
Author: wenlong.lwl <[email protected]>
Date: 2017-04-13T09:23:15Z
Set runtimeContext to output format when it is a RichOutputFormat
----
> Local execution is not setting RuntimeContext for RichOutputFormat
> ------------------------------------------------------------------
>
> Key: FLINK-6298
> URL: https://issues.apache.org/jira/browse/FLINK-6298
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.1.0, 1.2.0
> Reporter: Mateusz Zakarczemny
> Assignee: Wenlong Lyu
>
> RuntimeContext is never set in RichOutputFormat. I tested it in local
> execution. RichMapFunction is setup correctly.
> Following code will never print "//////Context set in RichOutputFormat"
> {code}
> import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext}
> import org.apache.flink.api.common.io.RichOutputFormat
> import org.apache.flink.api.scala._
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> object Startup {
> def main(args: Array[String]): Unit = {
> val mapFunction = new RichMapFunction[String, String] {
> def open(taskNumber: Int, numTasks: Int) { getRuntimeContext }
> def map(event: String) = { event }
> override def setRuntimeContext(t: RuntimeContext) = {
> println("//////Context set in RichMapFunction")
> super.setRuntimeContext(t)
> }
> }
> val outputFormat = new RichOutputFormat[String] {
> override def setRuntimeContext(t: RuntimeContext) = {
> println("//////Context set in RichOutputFormat")
> super.setRuntimeContext(t)
> }
> def open(taskNumber: Int, numTasks: Int) {}
> def writeRecord(event: String) {
> println(event)
> }
> def configure(parameters: Configuration): Unit = {}
> def close(): Unit = {}
> }
> val see = StreamExecutionEnvironment.getExecutionEnvironment
> val eventsStream = see.fromElements[String]("A", "B",
> "C").map(mapFunction)
> eventsStream.writeUsingOutputFormat(outputFormat)
> see.execute("test-job")
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)