[FLINK-2550] Add Window Aggregations to new Windowing API
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28a38bb7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28a38bb7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28a38bb7 Branch: refs/heads/master Commit: 28a38bb7dedbc10ceab9d4ae1dbcc15789e33211 Parents: 0bac272 Author: Aljoscha Krettek <[email protected]> Authored: Tue Oct 6 17:37:39 2015 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Wed Oct 7 22:08:25 2015 +0200 ---------------------------------------------------------------------- .../api/datastream/AllWindowedStream.java | 205 +++++++++++++++++++ .../api/datastream/WindowedStream.java | 205 +++++++++++++++++++ .../aggregation/ComparableAggregator.java | 11 +- .../functions/aggregation/SumAggregator.java | 2 +- .../windowing/util/SessionWindowingData.java | 2 +- .../streaming/api/scala/AllWindowedStream.scala | 96 +++++++++ .../streaming/api/scala/WindowedStream.scala | 96 +++++++++ .../StreamingScalaAPICompletenessTest.scala | 3 + 8 files changed, 612 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 0cc1854..89c4857 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -25,6 +25,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; +import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; +import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -222,6 +225,204 @@ public class AllWindowedStream<T, W extends Window> { } // ------------------------------------------------------------------------ + // Aggregations on the windows + // ------------------------------------------------------------------------ + + /** + * Applies an aggregation that sums every window of the data stream at the + * given position. + * + * @param positionToSum The position in the tuple/array to sum + * @return The transformed DataStream. + */ + public DataStream<T> sum(int positionToSum) { + return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig())); + } + + /** + * Applies an aggregation that sums every window of the pojo data stream at + * the given field for every window. + * + * <p> + * A field expression is either + * the name of a public field or a getter method with parentheses of the + * stream's underlying type. A dot can be used to drill down into objects, + * as in {@code "field1.getInnerField2()" }. + * + * @param field The field to sum + * @return The transformed DataStream. + */ + public DataStream<T> sum(String field) { + return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig())); + } + + /** + * Applies an aggregation that that gives the minimum value of every window + * of the data stream at the given position. + * + * @param positionToMin The position to minimize + * @return The transformed DataStream. + */ + public DataStream<T> min(int positionToMin) { + return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig())); + } + + /** + * Applies an aggregation that that gives the minimum value of the pojo data + * stream at the given field expression for every window. + * + * <p> + * A field + * expression is either the name of a public field or a getter method with + * parentheses of the {@link DataStream}S underlying type. A dot can be used + * to drill down into objects, as in {@code "field1.getInnerField2()" }. + * + * @param field The field expression based on which the aggregation will be applied. + * @return The transformed DataStream. + */ + public DataStream<T> min(String field) { + return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig())); + } + + /** + * Applies an aggregation that gives the minimum element of every window of + * the data stream by the given position. If more elements have the same + * minimum value the operator returns the first element by default. + * + * @param positionToMinBy + * The position to minimize by + * @return The transformed DataStream. + */ + public DataStream<T> minBy(int positionToMinBy) { + return this.minBy(positionToMinBy, true); + } + + /** + * Applies an aggregation that gives the minimum element of every window of + * the data stream by the given position. If more elements have the same + * minimum value the operator returns the first element by default. + * + * @param positionToMinBy The position to minimize by + * @return The transformed DataStream. + */ + public DataStream<T> minBy(String positionToMinBy) { + return this.minBy(positionToMinBy, true); + } + + /** + * Applies an aggregation that gives the minimum element of every window of + * the data stream by the given position. If more elements have the same + * minimum value the operator returns either the first or last one depending + * on the parameter setting. + * + * @param positionToMinBy The position to minimize + * @param first If true, then the operator return the first element with the minimum value, otherwise returns the last + * @return The transformed DataStream. + */ + public DataStream<T> minBy(int positionToMinBy, boolean first) { + return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig())); + } + + /** + * Applies an aggregation that that gives the minimum element of the pojo + * data stream by the given field expression for every window. A field + * expression is either the name of a public field or a getter method with + * parentheses of the {@link DataStream DataStreams} underlying type. A dot can be used + * to drill down into objects, as in {@code "field1.getInnerField2()" }. + * + * @param field The field expression based on which the aggregation will be applied. + * @param first If True then in case of field equality the first object will be returned + * @return The transformed DataStream. + */ + public DataStream<T> minBy(String field, boolean first) { + return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig())); + } + + /** + * Applies an aggregation that gives the maximum value of every window of + * the data stream at the given position. + * + * @param positionToMax The position to maximize + * @return The transformed DataStream. + */ + public DataStream<T> max(int positionToMax) { + return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig())); + } + + /** + * Applies an aggregation that that gives the maximum value of the pojo data + * stream at the given field expression for every window. A field expression + * is either the name of a public field or a getter method with parentheses + * of the {@link DataStream DataStreams} underlying type. A dot can be used to drill + * down into objects, as in {@code "field1.getInnerField2()" }. + * + * @param field The field expression based on which the aggregation will be applied. + * @return The transformed DataStream. + */ + public DataStream<T> max(String field) { + return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig())); + } + + /** + * Applies an aggregation that gives the maximum element of every window of + * the data stream by the given position. If more elements have the same + * maximum value the operator returns the first by default. + * + * @param positionToMaxBy + * The position to maximize by + * @return The transformed DataStream. + */ + public DataStream<T> maxBy(int positionToMaxBy) { + return this.maxBy(positionToMaxBy, true); + } + + /** + * Applies an aggregation that gives the maximum element of every window of + * the data stream by the given position. If more elements have the same + * maximum value the operator returns the first by default. + * + * @param positionToMaxBy + * The position to maximize by + * @return The transformed DataStream. + */ + public DataStream<T> maxBy(String positionToMaxBy) { + return this.maxBy(positionToMaxBy, true); + } + + /** + * Applies an aggregation that gives the maximum element of every window of + * the data stream by the given position. If more elements have the same + * maximum value the operator returns either the first or last one depending + * on the parameter setting. + * + * @param positionToMaxBy The position to maximize by + * @param first If true, then the operator return the first element with the maximum value, otherwise returns the last + * @return The transformed DataStream. + */ + public DataStream<T> maxBy(int positionToMaxBy, boolean first) { + return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig())); + } + + /** + * Applies an aggregation that that gives the maximum element of the pojo + * data stream by the given field expression for every window. A field + * expression is either the name of a public field or a getter method with + * parentheses of the {@link DataStream}S underlying type. A dot can be used + * to drill down into objects, as in {@code "field1.getInnerField2()" }. + * + * @param field The field expression based on which the aggregation will be applied. + * @param first If True then in case of field equality the first object will be returned + * @return The transformed DataStream. + */ + public DataStream<T> maxBy(String field, boolean first) { + return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig())); + } + + private DataStream<T> aggregate(AggregationFunction<T> aggregator) { + return reduce(aggregator); + } + + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ @@ -238,4 +439,8 @@ public class AllWindowedStream<T, W extends Window> { public StreamExecutionEnvironment getExecutionEnvironment() { return input.getExecutionEnvironment(); } + + public TypeInformation<T> getInputType() { + return input.getType(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 0ea9cad..1273b42 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -26,6 +26,9 @@ import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; +import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; +import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -241,6 +244,204 @@ public class WindowedStream<T, K, W extends Window> { } // ------------------------------------------------------------------------ + // Aggregations on the keyed windows + // ------------------------------------------------------------------------ + + /** + * Applies an aggregation that sums every window of the data stream at the + * given position. + * + * @param positionToSum The position in the tuple/array to sum + * @return The transformed DataStream. + */ + public DataStream<T> sum(int positionToSum) { + return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig())); + } + + /** + * Applies an aggregation that sums every window of the pojo data stream at + * the given field for every window. + * + * <p> + * A field expression is either + * the name of a public field or a getter method with parentheses of the + * stream's underlying type. A dot can be used to drill down into objects, + * as in {@code "field1.getInnerField2()" }. + * + * @param field The field to sum + * @return The transformed DataStream. + */ + public DataStream<T> sum(String field) { + return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig())); + } + + /** + * Applies an aggregation that that gives the minimum value of every window + * of the data stream at the given position. + * + * @param positionToMin The position to minimize + * @return The transformed DataStream. + */ + public DataStream<T> min(int positionToMin) { + return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig())); + } + + /** + * Applies an aggregation that that gives the minimum value of the pojo data + * stream at the given field expression for every window. + * + * <p> + * A field + * expression is either the name of a public field or a getter method with + * parentheses of the {@link DataStream}S underlying type. A dot can be used + * to drill down into objects, as in {@code "field1.getInnerField2()" }. + * + * @param field The field expression based on which the aggregation will be applied. + * @return The transformed DataStream. + */ + public DataStream<T> min(String field) { + return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig())); + } + + /** + * Applies an aggregation that gives the minimum element of every window of + * the data stream by the given position. If more elements have the same + * minimum value the operator returns the first element by default. + * + * @param positionToMinBy + * The position to minimize by + * @return The transformed DataStream. + */ + public DataStream<T> minBy(int positionToMinBy) { + return this.minBy(positionToMinBy, true); + } + + /** + * Applies an aggregation that gives the minimum element of every window of + * the data stream by the given position. If more elements have the same + * minimum value the operator returns the first element by default. + * + * @param positionToMinBy The position to minimize by + * @return The transformed DataStream. + */ + public DataStream<T> minBy(String positionToMinBy) { + return this.minBy(positionToMinBy, true); + } + + /** + * Applies an aggregation that gives the minimum element of every window of + * the data stream by the given position. If more elements have the same + * minimum value the operator returns either the first or last one depending + * on the parameter setting. + * + * @param positionToMinBy The position to minimize + * @param first If true, then the operator return the first element with the minimum value, otherwise returns the last + * @return The transformed DataStream. + */ + public DataStream<T> minBy(int positionToMinBy, boolean first) { + return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig())); + } + + /** + * Applies an aggregation that that gives the minimum element of the pojo + * data stream by the given field expression for every window. A field + * expression is either the name of a public field or a getter method with + * parentheses of the {@link DataStream DataStreams} underlying type. A dot can be used + * to drill down into objects, as in {@code "field1.getInnerField2()" }. + * + * @param field The field expression based on which the aggregation will be applied. + * @param first If True then in case of field equality the first object will be returned + * @return The transformed DataStream. + */ + public DataStream<T> minBy(String field, boolean first) { + return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig())); + } + + /** + * Applies an aggregation that gives the maximum value of every window of + * the data stream at the given position. + * + * @param positionToMax The position to maximize + * @return The transformed DataStream. + */ + public DataStream<T> max(int positionToMax) { + return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig())); + } + + /** + * Applies an aggregation that that gives the maximum value of the pojo data + * stream at the given field expression for every window. A field expression + * is either the name of a public field or a getter method with parentheses + * of the {@link DataStream DataStreams} underlying type. A dot can be used to drill + * down into objects, as in {@code "field1.getInnerField2()" }. + * + * @param field The field expression based on which the aggregation will be applied. + * @return The transformed DataStream. + */ + public DataStream<T> max(String field) { + return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig())); + } + + /** + * Applies an aggregation that gives the maximum element of every window of + * the data stream by the given position. If more elements have the same + * maximum value the operator returns the first by default. + * + * @param positionToMaxBy + * The position to maximize by + * @return The transformed DataStream. + */ + public DataStream<T> maxBy(int positionToMaxBy) { + return this.maxBy(positionToMaxBy, true); + } + + /** + * Applies an aggregation that gives the maximum element of every window of + * the data stream by the given position. If more elements have the same + * maximum value the operator returns the first by default. + * + * @param positionToMaxBy + * The position to maximize by + * @return The transformed DataStream. + */ + public DataStream<T> maxBy(String positionToMaxBy) { + return this.maxBy(positionToMaxBy, true); + } + + /** + * Applies an aggregation that gives the maximum element of every window of + * the data stream by the given position. If more elements have the same + * maximum value the operator returns either the first or last one depending + * on the parameter setting. + * + * @param positionToMaxBy The position to maximize by + * @param first If true, then the operator return the first element with the maximum value, otherwise returns the last + * @return The transformed DataStream. + */ + public DataStream<T> maxBy(int positionToMaxBy, boolean first) { + return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig())); + } + + /** + * Applies an aggregation that that gives the maximum element of the pojo + * data stream by the given field expression for every window. A field + * expression is either the name of a public field or a getter method with + * parentheses of the {@link DataStream}S underlying type. A dot can be used + * to drill down into objects, as in {@code "field1.getInnerField2()" }. + * + * @param field The field expression based on which the aggregation will be applied. + * @param first If True then in case of field equality the first object will be returned + * @return The transformed DataStream. + */ + public DataStream<T> maxBy(String field, boolean first) { + return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig())); + } + + private DataStream<T> aggregate(AggregationFunction<T> aggregator) { + return reduce(aggregator); + } + + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ @@ -307,4 +508,8 @@ public class WindowedStream<T, K, W extends Window> { public StreamExecutionEnvironment getExecutionEnvironment() { return input.getExecutionEnvironment(); } + + public TypeInformation<T> getInputType() { + return input.getType(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java index 766a59e..e5501a0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java @@ -33,8 +33,7 @@ public class ComparableAggregator<T> extends AggregationFunction<T> { private ComparableAggregator(int pos, AggregationType aggregationType, boolean first) { super(pos); this.comparator = Comparator.getForAggregation(aggregationType); - this.byAggregate = (aggregationType == AggregationType.MAXBY) - || (aggregationType == AggregationType.MINBY); + this.byAggregate = (aggregationType == AggregationType.MAXBY) || (aggregationType == AggregationType.MINBY); this.first = first; } @@ -61,7 +60,7 @@ public class ComparableAggregator<T> extends AggregationFunction<T> { @SuppressWarnings("unchecked") @Override public T reduce(T value1, T value2) throws Exception { - Comparable<Object> o1 = (Comparable<Object>)fieldAccessor.get(value1); + Comparable<Object> o1 = (Comparable<Object>) fieldAccessor.get(value1); Object o2 = fieldAccessor.get(value2); int c = comparator.isExtremal(o1, o2); @@ -79,10 +78,10 @@ public class ComparableAggregator<T> extends AggregationFunction<T> { return value2; } else { - if (c == 1) { - value2 = fieldAccessor.set(value2, o1); + if (c == 0) { + value1 = fieldAccessor.set(value1, o2); } - return value2; + return value1; } } http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java index c23695e..b045233 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java @@ -43,6 +43,6 @@ public class SumAggregator<T> extends AggregationFunction<T> { @SuppressWarnings("unchecked") @Override public T reduce(T value1, T value2) throws Exception { - return fieldAccessor.set(value2, adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2))); + return fieldAccessor.set(value1, adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2))); } } http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java index bb4a123..c1a99a8 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.examples.windowing.util; public class SessionWindowingData { - public static final String EXPECTED = "(a,1,1)\n" + "(c,6,1)\n" + "(c,11,1)\n" + "(b,5,3)\n" + + public static final String EXPECTED = "(a,1,1)\n" + "(c,6,1)\n" + "(c,11,1)\n" + "(b,1,3)\n" + "(a,10,1)"; private SessionWindowingData() { http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala index 9054d95..d2d0a1d 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala @@ -21,6 +21,8 @@ package org.apache.flink.streaming.api.scala import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWStream} +import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType +import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator} import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction import org.apache.flink.streaming.api.windowing.evictors.Evictor import org.apache.flink.streaming.api.windowing.triggers.Trigger @@ -134,6 +136,96 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { javaStream.apply(clean(function), implicitly[TypeInformation[R]]) } + // ------------------------------------------------------------------------ + // Aggregations on the keyed windows + // ------------------------------------------------------------------------ + + /** + * Applies an aggregation that that gives the maximum of the elements in the window at + * the given position. + */ + def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position) + + /** + * Applies an aggregation that that gives the maximum of the elements in the window at + * the given field. + */ + def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field) + + /** + * Applies an aggregation that that gives the minimum of the elements in the window at + * the given position. + */ + def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position) + + /** + * Applies an aggregation that that gives the minimum of the elements in the window at + * the given field. + */ + def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field) + + /** + * Applies an aggregation that sums the elements in the window at the given position. + */ + def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position) + + /** + * Applies an aggregation that sums the elements in the window at the given field. + */ + def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field) + + /** + * Applies an aggregation that that gives the maximum element of the window by + * the given position. When equality, returns the first. + */ + def maxBy(position: Int): DataStream[T] = aggregate(AggregationType.MAXBY, + position) + + /** + * Applies an aggregation that that gives the maximum element of the window by + * the given field. When equality, returns the first. + */ + def maxBy(field: String): DataStream[T] = aggregate(AggregationType.MAXBY, + field) + + /** + * Applies an aggregation that that gives the minimum element of the window by + * the given position. When equality, returns the first. + */ + def minBy(position: Int): DataStream[T] = aggregate(AggregationType.MINBY, + position) + + /** + * Applies an aggregation that that gives the minimum element of the window by + * the given field. When equality, returns the first. + */ + def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY, + field) + + private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = { + val position = fieldNames2Indices(getInputType(), Array(field))(0) + aggregate(aggregationType, position) + } + + def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = { + + val jStream = javaStream.asInstanceOf[JavaAllWStream[Product, W]] + + val reducer = aggregationType match { + case AggregationType.SUM => + new SumAggregator(position, jStream.getInputType, jStream.getExecutionEnvironment.getConfig) + + case _ => + new ComparableAggregator( + position, + jStream.getInputType, + aggregationType, + true, + jStream.getExecutionEnvironment.getConfig) + } + + new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]] + } // ------------------------------------------------------------------------ // Utilities @@ -147,4 +239,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f) } + /** + * Gets the output type. + */ + private def getInputType(): TypeInformation[T] = javaStream.getInputType } http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala index 2d6806d..3963765 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala @@ -21,6 +21,8 @@ package org.apache.flink.streaming.api.scala import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream} +import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType +import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator} import org.apache.flink.streaming.api.functions.windowing.WindowFunction import org.apache.flink.streaming.api.windowing.evictors.Evictor import org.apache.flink.streaming.api.windowing.triggers.Trigger @@ -137,6 +139,96 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { javaStream.apply(clean(function), implicitly[TypeInformation[R]]) } + // ------------------------------------------------------------------------ + // Aggregations on the keyed windows + // ------------------------------------------------------------------------ + + /** + * Applies an aggregation that that gives the maximum of the elements in the window at + * the given position. + */ + def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position) + + /** + * Applies an aggregation that that gives the maximum of the elements in the window at + * the given field. + */ + def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field) + + /** + * Applies an aggregation that that gives the minimum of the elements in the window at + * the given position. + */ + def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position) + + /** + * Applies an aggregation that that gives the minimum of the elements in the window at + * the given field. + */ + def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field) + + /** + * Applies an aggregation that sums the elements in the window at the given position. + */ + def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position) + + /** + * Applies an aggregation that sums the elements in the window at the given field. + */ + def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field) + + /** + * Applies an aggregation that that gives the maximum element of the window by + * the given position. When equality, returns the first. + */ + def maxBy(position: Int): DataStream[T] = aggregate(AggregationType.MAXBY, + position) + + /** + * Applies an aggregation that that gives the maximum element of the window by + * the given field. When equality, returns the first. + */ + def maxBy(field: String): DataStream[T] = aggregate(AggregationType.MAXBY, + field) + + /** + * Applies an aggregation that that gives the minimum element of the window by + * the given position. When equality, returns the first. + */ + def minBy(position: Int): DataStream[T] = aggregate(AggregationType.MINBY, + position) + + /** + * Applies an aggregation that that gives the minimum element of the window by + * the given field. When equality, returns the first. + */ + def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY, + field) + + private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = { + val position = fieldNames2Indices(getInputType(), Array(field))(0) + aggregate(aggregationType, position) + } + + def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = { + + val jStream = javaStream.asInstanceOf[JavaWStream[Product, K, W]] + + val reducer = aggregationType match { + case AggregationType.SUM => + new SumAggregator(position, jStream.getInputType, jStream.getExecutionEnvironment.getConfig) + + case _ => + new ComparableAggregator( + position, + jStream.getInputType, + aggregationType, + true, + jStream.getExecutionEnvironment.getConfig) + } + + new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]] + } // ------------------------------------------------------------------------ // Utilities @@ -150,4 +242,8 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f) } + /** + * Gets the output type. + */ + private def getInputType(): TypeInformation[T] = javaStream.getInputType } http://git-wip-us.apache.org/repos/asf/flink/blob/28a38bb7/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala index c6bd87a..53aa1e2 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala @@ -51,11 +51,14 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { "org.apache.flink.streaming.api.datastream.ConnectedStreams.getType2", "org.apache.flink.streaming.api.datastream.ConnectedStreams.addGeneralWindowCombine", "org.apache.flink.streaming.api.datastream.ConnectedStreams.transform", + "org.apache.flink.streaming.api.datastream.WindowedDataStream.getType", "org.apache.flink.streaming.api.datastream.WindowedDataStream.getExecutionConfig", "org.apache.flink.streaming.api.datastream.WindowedStream.getExecutionEnvironment", + "org.apache.flink.streaming.api.datastream.WindowedStream.getInputType", "org.apache.flink.streaming.api.datastream.AllWindowedStream.getExecutionEnvironment", + "org.apache.flink.streaming.api.datastream.AllWindowedStream.getInputType", "org.apache.flink.streaming.api.datastream.KeyedStream.transform", "org.apache.flink.streaming.api.datastream.KeyedStream.getKeySelector",
