[
https://issues.apache.org/jira/browse/FLINK-9600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568955#comment-16568955
]
Elias Levy commented on FLINK-9600:
-----------------------------------
[~aljoscha] I am aware of {{ProcessFunction}}, but I consider it an escape
hatch when you can't perform what you want within the higher level DSL. The
improvement I am suggestion is within the higher level DSL.
E.g.it is a lot nicer to write:
{code:java}
dataStream.filter( (x, ts) => { isDayTime(ts) } )
{code}
than
{code:java}
class ProcessFilter extends ProcessFunction[T,T] {
override def processElement(value: T, ctx: Context, out: Collector[T]): Unit
={
if (isDayTime(ctx.timestamp))
out.collect(value) }
}
}
dataStream.process(new ProcessFilter())
{code}
> Add DataStream transformation variants that pass timestamp to the user
> function
> -------------------------------------------------------------------------------
>
> Key: FLINK-9600
> URL: https://issues.apache.org/jira/browse/FLINK-9600
> Project: Flink
> Issue Type: Improvement
> Components: Streaming
> Affects Versions: 1.5.0
> Reporter: Elias Levy
> Priority: Minor
>
> It is often necessary to access the timestamp assigned to records within user
> functions. At the moment this is only possible from {{RichFunction}}.
> Implementing a {{RichFunction}} just to access the timestamp is burdensome,
> so most job carry a duplicate of the timestamp within the record.
> It would be useful if {{DataStream}} provided transformation methods that
> accepted user functions that could be passed the record's timestamp as an
> additional argument, similar to how there are two variants of {{flatMap}},
> one with an extra parameter that gives the user function access to the output
> {{Collector}}.
> Along similar lines, it may be useful to have variants that pass the record's
> key as an additional parameter.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)