yuanyangwu commented on a change in pull request #7175: 
[FLINK-11004][Documentation] wrong ProcessWindowFunction.process argument in 
window document
URL: https://github.com/apache/flink/pull/7175#discussion_r236528195
 
 

 ##########
 File path: docs/dev/stream/operators/windows.md
 ##########
 @@ -836,12 +836,12 @@ input
   .reduce(
     (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 
else r1 },
     ( key: String,
-      window: TimeWindow,
+      context: Context,
 
 Review comment:
   Thanks for the quick review.
   "Context" only is enough because both JAVA and Scala can inspect right 
Context.
   
   Here are scala and java examples from flink code.
   
   
https://github.com/apache/flink/blob/master/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala#L213
   
   ```scala
       val windowOperator = dataStream
         .assignTimestampsAndWatermarks(new TestAssigner)
         .keyBy(i => i._1)
         .window(TumblingEventTimeWindows.of(Time.milliseconds(1)))
         .sideOutputLateData(lateDataTag)
         .process(new ProcessWindowFunction[(String, Int), String, String, 
TimeWindow] {
           override def process(
               key:String,
               context: Context,
               elements: Iterable[(String, Int)],
               out: Collector[String]): Unit = {
             for (in <- elements) {
               out.collect(in._1)
             }
           }
         })
   ```
   
   
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java#L756
   ```java
                SingleOutputStreamOperator<Integer> windowOperator = dataStream
                                .assignTimestampsAndWatermarks(new 
TestWatermarkAssigner())
                                .keyBy(new TestKeySelector())
                                .timeWindow(Time.milliseconds(1), 
Time.milliseconds(1))
                                .process(new ProcessWindowFunction<Integer, 
Integer, Integer, TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
   
                                        @Override
                                        public void process(Integer integer, 
Context context, Iterable<Integer> elements, Collector<Integer> out) throws 
Exception {
                                                out.collect(integer);
                                                context.output(sideOutputTag, 
"sideout-" + String.valueOf(integer));
                                        }
                                });
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to