Hi Radu, there are already several WindowFunction implementations in the Table API that can help as a reference:
- IncrementalAggregateAllTimeWindowFunction [1] - IncrementalAggregateAllWindowFunction [2] - IncrementalAggregateTimeWindowFunction [3] - IncrementalAggregateTimeWindowFunction [4] Also have have a look at the DataStreamAggregate [5] class that assembles the DataStream programs based on these functions. Best, Fabian [1] https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala [2] https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala [3] https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala [4] https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala [5] https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala 2017-03-10 18:46 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>: > Hi, > > I am struggling to move a working implementation from Java to Scala > :(...this is for computing window aggregates (sliding window). > As I am not proficient in Scala I got block in (probably a stupid > error)...maybe someone can help me. > > > I am trying to create a simple window function to be applied to the > datastream after the window is created (I have one case with global windows > and another case with keyed windows, so the question applies on both > AllWindowFunction as well as to WindowFunction). However I get a > typemistamtch error when applying the function to the window. > > As I need to implement the function in scala... I tried 2 options, which > both fail: > Option 1: implement MyWindowFunction by extending the WindowFunction from > the scala package (org.apache.flink.streaming.api.scala.function) > ..in this case when I apply the function to the window it tells me that > the there is a typemistmatched > Option 2: implement MyWindowFunction by extending the Windowfunction from > the default package (org.apache.flink.streaming.api.functions.windowing) > ..in this case when I try to override the apply function I get a > compilation error that the class needs to be abstract as it does not > implement the apply function :( > > ...any solution? > > > >