[FLINK-2550] Add Window Aggregations to new Windowing API

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28a38bb7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28a38bb7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28a38bb7

Branch: refs/heads/master
Commit: 28a38bb7dedbc10ceab9d4ae1dbcc15789e33211
Parents: 0bac272
Author: Aljoscha Krettek <[email protected]>
Authored: Tue Oct 6 17:37:39 2015 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Wed Oct 7 22:08:25 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       | 205 +++++++++++++++++++
 .../api/datastream/WindowedStream.java          | 205 +++++++++++++++++++
 .../aggregation/ComparableAggregator.java       |  11 +-
 .../functions/aggregation/SumAggregator.java    |   2 +-
 .../windowing/util/SessionWindowingData.java    |   2 +-
 .../streaming/api/scala/AllWindowedStream.scala |  96 +++++++++
 .../streaming/api/scala/WindowedStream.scala    |  96 +++++++++
 .../StreamingScalaAPICompletenessTest.scala     |   3 +
 8 files changed, 612 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 0cc1854..89c4857 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -25,6 +25,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
+import 
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
 import 
org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -222,6 +225,204 @@ public class AllWindowedStream<T, W extends Window> {
        }
 
        // 
------------------------------------------------------------------------
+       //  Aggregations on the  windows
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Applies an aggregation that sums every window of the data stream at 
the
+        * given position.
+        *
+        * @param positionToSum The position in the tuple/array to sum
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> sum(int positionToSum) {
+               return aggregate(new SumAggregator<>(positionToSum, 
input.getType(), input.getExecutionConfig()));
+       }
+
+       /**
+        * Applies an aggregation that sums every window of the pojo data 
stream at
+        * the given field for every window.
+        *
+        * <p>
+        * A field expression is either
+        * the name of a public field or a getter method with parentheses of the
+        * stream's underlying type. A dot can be used to drill down into 
objects,
+        * as in {@code "field1.getInnerField2()" }.
+        *
+        * @param field The field to sum
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> sum(String field) {
+               return aggregate(new SumAggregator<>(field, input.getType(), 
input.getExecutionConfig()));
+       }
+
+       /**
+        * Applies an aggregation that that gives the minimum value of every 
window
+        * of the data stream at the given position.
+        *
+        * @param positionToMin The position to minimize
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> min(int positionToMin) {
+               return aggregate(new ComparableAggregator<>(positionToMin, 
input.getType(), AggregationFunction.AggregationType.MIN, 
input.getExecutionConfig()));
+       }
+
+       /**
+        * Applies an aggregation that that gives the minimum value of the pojo 
data
+        * stream at the given field expression for every window.
+        *
+        * <p>
+        * A field
+        * expression is either the name of a public field or a getter method 
with
+        * parentheses of the {@link DataStream}S underlying type. A dot can be 
used
+        * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+        *
+        * @param field The field expression based on which the aggregation 
will be applied.
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> min(String field) {
+               return aggregate(new ComparableAggregator<>(field, 
input.getType(), AggregationFunction.AggregationType.MIN, false, 
input.getExecutionConfig()));
+       }
+
+       /**
+        * Applies an aggregation that gives the minimum element of every 
window of
+        * the data stream by the given position. If more elements have the same
+        * minimum value the operator returns the first element by default.
+        *
+        * @param positionToMinBy
+        *            The position to minimize by
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> minBy(int positionToMinBy) {
+               return this.minBy(positionToMinBy, true);
+       }
+
+       /**
+        * Applies an aggregation that gives the minimum element of every 
window of
+        * the data stream by the given position. If more elements have the same
+        * minimum value the operator returns the first element by default.
+        *
+        * @param positionToMinBy The position to minimize by
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> minBy(String positionToMinBy) {
+               return this.minBy(positionToMinBy, true);
+       }
+
+       /**
+        * Applies an aggregation that gives the minimum element of every 
window of
+        * the data stream by the given position. If more elements have the same
+        * minimum value the operator returns either the first or last one 
depending
+        * on the parameter setting.
+        *
+        * @param positionToMinBy The position to minimize
+        * @param first If true, then the operator return the first element 
with the minimum value, otherwise returns the last
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> minBy(int positionToMinBy, boolean first) {
+               return aggregate(new ComparableAggregator<>(positionToMinBy, 
input.getType(), AggregationFunction.AggregationType.MINBY, first, 
input.getExecutionConfig()));
+       }
+
+       /**
+        * Applies an aggregation that that gives the minimum element of the 
pojo
+        * data stream by the given field expression for every window. A field
+        * expression is either the name of a public field or a getter method 
with
+        * parentheses of the {@link DataStream DataStreams} underlying type. A 
dot can be used
+        * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+        *
+        * @param field The field expression based on which the aggregation 
will be applied.
+        * @param first If True then in case of field equality the first object 
will be returned
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> minBy(String field, boolean first) {
+               return aggregate(new ComparableAggregator<>(field, 
input.getType(), AggregationFunction.AggregationType.MINBY, first, 
input.getExecutionConfig()));
+       }
+
+       /**
+        * Applies an aggregation that gives the maximum value of every window 
of
+        * the data stream at the given position.
+        *
+        * @param positionToMax The position to maximize
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> max(int positionToMax) {
+               return aggregate(new ComparableAggregator<>(positionToMax, 
input.getType(), AggregationFunction.AggregationType.MAX, 
input.getExecutionConfig()));
+       }
+
+       /**
+        * Applies an aggregation that that gives the maximum value of the pojo 
data
+        * stream at the given field expression for every window. A field 
expression
+        * is either the name of a public field or a getter method with 
parentheses
+        * of the {@link DataStream DataStreams} underlying type. A dot can be 
used to drill
+        * down into objects, as in {@code "field1.getInnerField2()" }.
+        *
+        * @param field The field expression based on which the aggregation 
will be applied.
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> max(String field) {
+               return aggregate(new ComparableAggregator<>(field, 
input.getType(), AggregationFunction.AggregationType.MAX, false, 
input.getExecutionConfig()));
+       }
+
+       /**
+        * Applies an aggregation that gives the maximum element of every 
window of
+        * the data stream by the given position. If more elements have the same
+        * maximum value the operator returns the first by default.
+        *
+        * @param positionToMaxBy
+        *            The position to maximize by
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> maxBy(int positionToMaxBy) {
+               return this.maxBy(positionToMaxBy, true);
+       }
+
+       /**
+        * Applies an aggregation that gives the maximum element of every 
window of
+        * the data stream by the given position. If more elements have the same
+        * maximum value the operator returns the first by default.
+        *
+        * @param positionToMaxBy
+        *            The position to maximize by
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> maxBy(String positionToMaxBy) {
+               return this.maxBy(positionToMaxBy, true);
+       }
+
+       /**
+        * Applies an aggregation that gives the maximum element of every 
window of
+        * the data stream by the given position. If more elements have the same
+        * maximum value the operator returns either the first or last one 
depending
+        * on the parameter setting.
+        *
+        * @param positionToMaxBy The position to maximize by
+        * @param first If true, then the operator return the first element 
with the maximum value, otherwise returns the last
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> maxBy(int positionToMaxBy, boolean first) {
+               return aggregate(new ComparableAggregator<>(positionToMaxBy, 
input.getType(), AggregationFunction.AggregationType.MAXBY, first, 
input.getExecutionConfig()));
+       }
+
+       /**
+        * Applies an aggregation that that gives the maximum element of the 
pojo
+        * data stream by the given field expression for every window. A field
+        * expression is either the name of a public field or a getter method 
with
+        * parentheses of the {@link DataStream}S underlying type. A dot can be 
used
+        * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+        *
+        * @param field The field expression based on which the aggregation 
will be applied.
+        * @param first If True then in case of field equality the first object 
will be returned
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> maxBy(String field, boolean first) {
+               return aggregate(new ComparableAggregator<>(field, 
input.getType(), AggregationFunction.AggregationType.MAXBY, first, 
input.getExecutionConfig()));
+       }
+
+       private DataStream<T> aggregate(AggregationFunction<T> aggregator) {
+               return reduce(aggregator);
+       }
+
+       // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------
 
@@ -238,4 +439,8 @@ public class AllWindowedStream<T, W extends Window> {
        public StreamExecutionEnvironment getExecutionEnvironment() {
                return input.getExecutionEnvironment();
        }
+
+       public TypeInformation<T> getInputType() {
+               return input.getType();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 0ea9cad..1273b42 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -26,6 +26,9 @@ import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
+import 
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -241,6 +244,204 @@ public class WindowedStream<T, K, W extends Window> {
        }
 
        // 
------------------------------------------------------------------------
+       //  Aggregations on the keyed windows
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Applies an aggregation that sums every window of the data stream at 
the
+        * given position.
+        *
+        * @param positionToSum The position in the tuple/array to sum
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> sum(int positionToSum) {
+               return aggregate(new SumAggregator<>(positionToSum, 
input.getType(), input.getExecutionConfig()));
+       }
+
+       /**
+        * Applies an aggregation that sums every window of the pojo data 
stream at
+        * the given field for every window.
+        *
+        * <p>
+        * A field expression is either
+        * the name of a public field or a getter method with parentheses of the
+        * stream's underlying type. A dot can be used to drill down into 
objects,
+        * as in {@code "field1.getInnerField2()" }.
+        *
+        * @param field The field to sum
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> sum(String field) {
+               return aggregate(new SumAggregator<>(field, input.getType(), 
input.getExecutionConfig()));
+       }
+
+       /**
+        * Applies an aggregation that that gives the minimum value of every 
window
+        * of the data stream at the given position.
+        *
+        * @param positionToMin The position to minimize
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> min(int positionToMin) {
+               return aggregate(new ComparableAggregator<>(positionToMin, 
input.getType(), AggregationFunction.AggregationType.MIN, 
input.getExecutionConfig()));
+       }
+
+       /**
+        * Applies an aggregation that that gives the minimum value of the pojo 
data
+        * stream at the given field expression for every window.
+        *
+        * <p>
+        * A field
+        * expression is either the name of a public field or a getter method 
with
+        * parentheses of the {@link DataStream}S underlying type. A dot can be 
used
+        * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+        *
+        * @param field The field expression based on which the aggregation 
will be applied.
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> min(String field) {
+               return aggregate(new ComparableAggregator<>(field, 
input.getType(), AggregationFunction.AggregationType.MIN, false, 
input.getExecutionConfig()));
+       }
+
+       /**
+        * Applies an aggregation that gives the minimum element of every 
window of
+        * the data stream by the given position. If more elements have the same
+        * minimum value the operator returns the first element by default.
+        *
+        * @param positionToMinBy
+        *            The position to minimize by
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> minBy(int positionToMinBy) {
+               return this.minBy(positionToMinBy, true);
+       }
+
+       /**
+        * Applies an aggregation that gives the minimum element of every 
window of
+        * the data stream by the given position. If more elements have the same
+        * minimum value the operator returns the first element by default.
+        *
+        * @param positionToMinBy The position to minimize by
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> minBy(String positionToMinBy) {
+               return this.minBy(positionToMinBy, true);
+       }
+
+       /**
+        * Applies an aggregation that gives the minimum element of every 
window of
+        * the data stream by the given position. If more elements have the same
+        * minimum value the operator returns either the first or last one 
depending
+        * on the parameter setting.
+        *
+        * @param positionToMinBy The position to minimize
+        * @param first If true, then the operator return the first element 
with the minimum value, otherwise returns the last
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> minBy(int positionToMinBy, boolean first) {
+               return aggregate(new ComparableAggregator<>(positionToMinBy, 
input.getType(), AggregationFunction.AggregationType.MINBY, first, 
input.getExecutionConfig()));
+       }
+
+       /**
+        * Applies an aggregation that that gives the minimum element of the 
pojo
+        * data stream by the given field expression for every window. A field
+        * expression is either the name of a public field or a getter method 
with
+        * parentheses of the {@link DataStream DataStreams} underlying type. A 
dot can be used
+        * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+        *
+        * @param field The field expression based on which the aggregation 
will be applied.
+        * @param first If True then in case of field equality the first object 
will be returned
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> minBy(String field, boolean first) {
+               return aggregate(new ComparableAggregator<>(field, 
input.getType(), AggregationFunction.AggregationType.MINBY, first, 
input.getExecutionConfig()));
+       }
+
+       /**
+        * Applies an aggregation that gives the maximum value of every window 
of
+        * the data stream at the given position.
+        *
+        * @param positionToMax The position to maximize
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> max(int positionToMax) {
+               return aggregate(new ComparableAggregator<>(positionToMax, 
input.getType(), AggregationFunction.AggregationType.MAX, 
input.getExecutionConfig()));
+       }
+
+       /**
+        * Applies an aggregation that that gives the maximum value of the pojo 
data
+        * stream at the given field expression for every window. A field 
expression
+        * is either the name of a public field or a getter method with 
parentheses
+        * of the {@link DataStream DataStreams} underlying type. A dot can be 
used to drill
+        * down into objects, as in {@code "field1.getInnerField2()" }.
+        *
+        * @param field The field expression based on which the aggregation 
will be applied.
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> max(String field) {
+               return aggregate(new ComparableAggregator<>(field, 
input.getType(), AggregationFunction.AggregationType.MAX, false, 
input.getExecutionConfig()));
+       }
+
+       /**
+        * Applies an aggregation that gives the maximum element of every 
window of
+        * the data stream by the given position. If more elements have the same
+        * maximum value the operator returns the first by default.
+        *
+        * @param positionToMaxBy
+        *            The position to maximize by
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> maxBy(int positionToMaxBy) {
+               return this.maxBy(positionToMaxBy, true);
+       }
+
+       /**
+        * Applies an aggregation that gives the maximum element of every 
window of
+        * the data stream by the given position. If more elements have the same
+        * maximum value the operator returns the first by default.
+        *
+        * @param positionToMaxBy
+        *            The position to maximize by
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> maxBy(String positionToMaxBy) {
+               return this.maxBy(positionToMaxBy, true);
+       }
+
+       /**
+        * Applies an aggregation that gives the maximum element of every 
window of
+        * the data stream by the given position. If more elements have the same
+        * maximum value the operator returns either the first or last one 
depending
+        * on the parameter setting.
+        *
+        * @param positionToMaxBy The position to maximize by
+        * @param first If true, then the operator return the first element 
with the maximum value, otherwise returns the last
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> maxBy(int positionToMaxBy, boolean first) {
+               return aggregate(new ComparableAggregator<>(positionToMaxBy, 
input.getType(), AggregationFunction.AggregationType.MAXBY, first, 
input.getExecutionConfig()));
+       }
+
+       /**
+        * Applies an aggregation that that gives the maximum element of the 
pojo
+        * data stream by the given field expression for every window. A field
+        * expression is either the name of a public field or a getter method 
with
+        * parentheses of the {@link DataStream}S underlying type. A dot can be 
used
+        * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+        *
+        * @param field The field expression based on which the aggregation 
will be applied.
+        * @param first If True then in case of field equality the first object 
will be returned
+        * @return The transformed DataStream.
+        */
+       public DataStream<T> maxBy(String field, boolean first) {
+               return aggregate(new ComparableAggregator<>(field, 
input.getType(), AggregationFunction.AggregationType.MAXBY, first, 
input.getExecutionConfig()));
+       }
+
+       private DataStream<T> aggregate(AggregationFunction<T> aggregator) {
+               return reduce(aggregator);
+       }
+
+       // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------
 
@@ -307,4 +508,8 @@ public class WindowedStream<T, K, W extends Window> {
        public StreamExecutionEnvironment getExecutionEnvironment() {
                return input.getExecutionEnvironment();
        }
+
+       public TypeInformation<T> getInputType() {
+               return input.getType();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
index 766a59e..e5501a0 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
@@ -33,8 +33,7 @@ public class ComparableAggregator<T> extends 
AggregationFunction<T> {
        private ComparableAggregator(int pos, AggregationType aggregationType, 
boolean first) {
                super(pos);
                this.comparator = Comparator.getForAggregation(aggregationType);
-               this.byAggregate = (aggregationType == AggregationType.MAXBY)
-                               || (aggregationType == AggregationType.MINBY);
+               this.byAggregate = (aggregationType == AggregationType.MAXBY) 
|| (aggregationType == AggregationType.MINBY);
                this.first = first;
        }
 
@@ -61,7 +60,7 @@ public class ComparableAggregator<T> extends 
AggregationFunction<T> {
        @SuppressWarnings("unchecked")
        @Override
        public T reduce(T value1, T value2) throws Exception {
-               Comparable<Object> o1 = 
(Comparable<Object>)fieldAccessor.get(value1);
+               Comparable<Object> o1 = (Comparable<Object>) 
fieldAccessor.get(value1);
                Object o2 = fieldAccessor.get(value2);
 
                int c = comparator.isExtremal(o1, o2);
@@ -79,10 +78,10 @@ public class ComparableAggregator<T> extends 
AggregationFunction<T> {
                        return value2;
 
                } else {
-                       if (c == 1) {
-                               value2 = fieldAccessor.set(value2, o1);
+                       if (c == 0) {
+                               value1 = fieldAccessor.set(value1, o2);
                        }
-                       return value2;
+                       return value1;
                }
 
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
index c23695e..b045233 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
@@ -43,6 +43,6 @@ public class SumAggregator<T> extends AggregationFunction<T> {
        @SuppressWarnings("unchecked")
        @Override
        public T reduce(T value1, T value2) throws Exception {
-               return fieldAccessor.set(value2, 
adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2)));
+               return fieldAccessor.set(value1, 
adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2)));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
index bb4a123..c1a99a8 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.examples.windowing.util;
 
 public class SessionWindowingData {
 
-       public static final String EXPECTED = "(a,1,1)\n" + "(c,6,1)\n" + 
"(c,11,1)\n" + "(b,5,3)\n" +
+       public static final String EXPECTED = "(a,1,1)\n" + "(c,6,1)\n" + 
"(c,11,1)\n" + "(b,1,3)\n" +
                        "(a,10,1)";
 
        private SessionWindowingData() {

http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 9054d95..d2d0a1d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -21,6 +21,8 @@ package org.apache.flink.streaming.api.scala
 import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream => 
JavaAllWStream}
+import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
+import 
org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, 
SumAggregator}
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
 import org.apache.flink.streaming.api.windowing.evictors.Evictor
 import org.apache.flink.streaming.api.windowing.triggers.Trigger
@@ -134,6 +136,96 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
     javaStream.apply(clean(function), implicitly[TypeInformation[R]])
   }
 
+  // ------------------------------------------------------------------------
+  //  Aggregations on the keyed windows
+  // ------------------------------------------------------------------------
+
+  /**
+   * Applies an aggregation that that gives the maximum of the elements in the 
window at
+   * the given position.
+   */
+  def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, 
position)
+
+  /**
+   * Applies an aggregation that that gives the maximum of the elements in the 
window at
+   * the given field.
+   */
+  def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)
+
+  /**
+   * Applies an aggregation that that gives the minimum of the elements in the 
window at
+   * the given position.
+   */
+  def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, 
position)
+
+  /**
+   * Applies an aggregation that that gives the minimum of the elements in the 
window at
+   * the given field.
+   */
+  def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)
+
+  /**
+   * Applies an aggregation that sums the elements in the window at the given 
position.
+   */
+  def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, 
position)
+
+  /**
+   * Applies an aggregation that sums the elements in the window at the given 
field.
+   */
+  def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field)
+
+  /**
+   * Applies an aggregation that that gives the maximum element of the window 
by
+   * the given position. When equality, returns the first.
+   */
+  def maxBy(position: Int): DataStream[T] = aggregate(AggregationType.MAXBY,
+    position)
+
+  /**
+   * Applies an aggregation that that gives the maximum element of the window 
by
+   * the given field. When equality, returns the first.
+   */
+  def maxBy(field: String): DataStream[T] = aggregate(AggregationType.MAXBY,
+    field)
+
+  /**
+   * Applies an aggregation that that gives the minimum element of the window 
by
+   * the given position. When equality, returns the first.
+   */
+  def minBy(position: Int): DataStream[T] = aggregate(AggregationType.MINBY,
+    position)
+
+  /**
+   * Applies an aggregation that that gives the minimum element of the window 
by
+   * the given field. When equality, returns the first.
+   */
+  def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY,
+    field)
+
+  private def aggregate(aggregationType: AggregationType, field: String): 
DataStream[T] = {
+    val position = fieldNames2Indices(getInputType(), Array(field))(0)
+    aggregate(aggregationType, position)
+  }
+
+  def aggregate(aggregationType: AggregationType, position: Int): 
DataStream[T] = {
+
+    val jStream = javaStream.asInstanceOf[JavaAllWStream[Product, W]]
+
+    val reducer = aggregationType match {
+      case AggregationType.SUM =>
+        new SumAggregator(position, jStream.getInputType, 
jStream.getExecutionEnvironment.getConfig)
+
+      case _ =>
+        new ComparableAggregator(
+          position,
+          jStream.getInputType,
+          aggregationType,
+          true,
+          jStream.getExecutionEnvironment.getConfig)
+    }
+
+    new 
DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]]
+  }
 
   // ------------------------------------------------------------------------
   //  Utilities
@@ -147,4 +239,8 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
     new 
StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
   }
 
+  /**
+   * Gets the output type.
+   */
+  private def getInputType(): TypeInformation[T] = javaStream.getInputType
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 2d6806d..3963765 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -21,6 +21,8 @@ package org.apache.flink.streaming.api.scala
 import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.{WindowedStream => 
JavaWStream}
+import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
+import 
org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, 
SumAggregator}
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction
 import org.apache.flink.streaming.api.windowing.evictors.Evictor
 import org.apache.flink.streaming.api.windowing.triggers.Trigger
@@ -137,6 +139,96 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
     javaStream.apply(clean(function), implicitly[TypeInformation[R]])
   }
 
+  // ------------------------------------------------------------------------
+  //  Aggregations on the keyed windows
+  // ------------------------------------------------------------------------
+
+  /**
+   * Applies an aggregation that that gives the maximum of the elements in the 
window at
+   * the given position.
+   */
+  def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, 
position)
+
+  /**
+   * Applies an aggregation that that gives the maximum of the elements in the 
window at
+   * the given field.
+   */
+  def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)
+
+  /**
+   * Applies an aggregation that that gives the minimum of the elements in the 
window at
+   * the given position.
+   */
+  def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, 
position)
+
+  /**
+   * Applies an aggregation that that gives the minimum of the elements in the 
window at
+   * the given field.
+   */
+  def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)
+
+  /**
+   * Applies an aggregation that sums the elements in the window at the given 
position.
+   */
+  def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, 
position)
+
+  /**
+   * Applies an aggregation that sums the elements in the window at the given 
field.
+   */
+  def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field)
+
+  /**
+   * Applies an aggregation that that gives the maximum element of the window 
by
+   * the given position. When equality, returns the first.
+   */
+  def maxBy(position: Int): DataStream[T] = aggregate(AggregationType.MAXBY,
+    position)
+
+  /**
+   * Applies an aggregation that that gives the maximum element of the window 
by
+   * the given field. When equality, returns the first.
+   */
+  def maxBy(field: String): DataStream[T] = aggregate(AggregationType.MAXBY,
+    field)
+
+  /**
+   * Applies an aggregation that that gives the minimum element of the window 
by
+   * the given position. When equality, returns the first.
+   */
+  def minBy(position: Int): DataStream[T] = aggregate(AggregationType.MINBY,
+    position)
+
+  /**
+   * Applies an aggregation that that gives the minimum element of the window 
by
+   * the given field. When equality, returns the first.
+   */
+  def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY,
+    field)
+
+  private def aggregate(aggregationType: AggregationType, field: String): 
DataStream[T] = {
+    val position = fieldNames2Indices(getInputType(), Array(field))(0)
+    aggregate(aggregationType, position)
+  }
+
+  def aggregate(aggregationType: AggregationType, position: Int): 
DataStream[T] = {
+
+    val jStream = javaStream.asInstanceOf[JavaWStream[Product, K, W]]
+
+    val reducer = aggregationType match {
+      case AggregationType.SUM =>
+        new SumAggregator(position, jStream.getInputType, 
jStream.getExecutionEnvironment.getConfig)
+
+      case _ =>
+        new ComparableAggregator(
+          position,
+          jStream.getInputType,
+          aggregationType,
+          true,
+          jStream.getExecutionEnvironment.getConfig)
+    }
+
+    new 
DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]]
+  }
 
   // ------------------------------------------------------------------------
   //  Utilities
@@ -150,4 +242,8 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
     new 
StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
   }
 
+  /**
+   * Gets the output type.
+   */
+  private def getInputType(): TypeInformation[T] = javaStream.getInputType
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index c6bd87a..53aa1e2 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -51,11 +51,14 @@ class StreamingScalaAPICompletenessTest extends 
ScalaAPICompletenessTestBase {
       "org.apache.flink.streaming.api.datastream.ConnectedStreams.getType2",
       
"org.apache.flink.streaming.api.datastream.ConnectedStreams.addGeneralWindowCombine",
       "org.apache.flink.streaming.api.datastream.ConnectedStreams.transform",
+
       "org.apache.flink.streaming.api.datastream.WindowedDataStream.getType",
       
"org.apache.flink.streaming.api.datastream.WindowedDataStream.getExecutionConfig",
 
       
"org.apache.flink.streaming.api.datastream.WindowedStream.getExecutionEnvironment",
+      "org.apache.flink.streaming.api.datastream.WindowedStream.getInputType",
       
"org.apache.flink.streaming.api.datastream.AllWindowedStream.getExecutionEnvironment",
+      
"org.apache.flink.streaming.api.datastream.AllWindowedStream.getInputType",
 
       "org.apache.flink.streaming.api.datastream.KeyedStream.transform",
       "org.apache.flink.streaming.api.datastream.KeyedStream.getKeySelector",

Reply via email to