[
https://issues.apache.org/jira/browse/FLINK-6023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905279#comment-15905279
]
ASF GitHub Bot commented on FLINK-6023:
---------------------------------------
Github user KurtYoung commented on a diff in the pull request:
https://github.com/apache/flink/pull/3510#discussion_r105422108
--- Diff: docs/dev/stream/process_function.md ---
@@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends
RichProcessFunction<Tuple2<String,
<div data-lang="scala" markdown="1">
{% highlight scala %}
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
-import
org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
-import org.apache.flink.util.Collector;
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.streaming.api.functions.RichProcessFunction
+import org.apache.flink.streaming.api.functions.ProcessFunction.Context
+import
org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
+import org.apache.flink.util.Collector
// the source data stream
-DataStream<Tuple2<String, String>> stream = ...;
+val stream: DataStream[Tuple2[String, String]] = ...
// apply the process function onto a keyed stream
-DataStream<Tuple2<String, Long>> result = stream
- .keyBy(0)
- .process(new CountWithTimeoutFunction());
+val result: DataStream[Tuple2[String, Long]] = stream
+ .keyBy(0)
+ .process(new CountWithTimeoutFunction())
/**
- * The data type stored in the state
- */
+ * The data type stored in the state
+ */
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
/**
- * The implementation of the ProcessFunction that maintains the count and
timeouts
- */
-class TimeoutStateFunction extends ProcessFunction[(String, Long),
(String, Long)] {
+ * The implementation of the ProcessFunction that maintains the count and
timeouts
+ */
+class TimeoutStateFunction extends RichProcessFunction[(String, Long),
(String, Long)] {
/** The state that is maintained by this process function */
- lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
- .getState(new ValueStateDescriptor<>("myState",
clasOf[CountWithTimestamp]))
+ lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
+ .getState(new ValueStateDescriptor[CountWithTimestamp]("myState",
classOf[CountWithTimestamp]))
override def processElement(value: (String, Long), ctx: Context, out:
Collector[(String, Long)]): Unit = {
// initialize or retrieve/update the state
+ val (key, _) = value
--- End diff --
Sorry i didn't make myself clear. What IDS complains is the variable name
`key` is conflicts with the following lines:
```case CountWithTimestamp(key, count, _) =>
CountWithTimestamp(key, count + 1, ctx.timestamp)
```
It's not clear whether you want to use the `key` you just defined or the
`key` in the match pattern.
> Fix Scala snippet into Process Function (Low-level Operations) Doc
> ------------------------------------------------------------------
>
> Key: FLINK-6023
> URL: https://issues.apache.org/jira/browse/FLINK-6023
> Project: Flink
> Issue Type: Bug
> Components: Documentation
> Reporter: Mauro Cortellazzi
> Assignee: Mauro Cortellazzi
> Priority: Trivial
> Fix For: 1.3.0, 1.2.1
>
>
> The current `/docs/dev/stream/process_function.md` has some errors in the
> Scala snippet
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)