aljoscha closed pull request #7274: [FLINK-11090][streaming api] Remove unused
parameter in WindowedStream.aggregate()
URL: https://github.com/apache/flink/pull/7274
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index 78a7273f6af..e8bfda089db 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -199,7 +199,7 @@ class DataStreamGroupWindowAggregate(
createKeyedWindowedStream(queryConfig, window, keyedStream)
.asInstanceOf[WindowedStream[CRow, Row, DataStreamWindow]]
- val (aggFunction, accumulatorRowType, aggResultRowType) =
+ val (aggFunction, accumulatorRowType) =
AggregateUtil.createDataStreamAggregateFunction(
generator,
namedAggregates,
@@ -211,7 +211,7 @@ class DataStreamGroupWindowAggregate(
tableEnv.getConfig)
windowedStream
- .aggregate(aggFunction, windowFunction, accumulatorRowType,
aggResultRowType, outRowType)
+ .aggregate(aggFunction, windowFunction, accumulatorRowType, outRowType)
.name(keyedAggOpName)
}
// global / non-keyed aggregation
@@ -225,7 +225,7 @@ class DataStreamGroupWindowAggregate(
createNonKeyedWindowedStream(queryConfig, window, timestampedInput)
.asInstanceOf[AllWindowedStream[CRow, DataStreamWindow]]
- val (aggFunction, accumulatorRowType, aggResultRowType) =
+ val (aggFunction, accumulatorRowType) =
AggregateUtil.createDataStreamAggregateFunction(
generator,
namedAggregates,
@@ -237,7 +237,7 @@ class DataStreamGroupWindowAggregate(
tableEnv.getConfig)
windowedStream
- .aggregate(aggFunction, windowFunction, accumulatorRowType,
aggResultRowType, outRowType)
+ .aggregate(aggFunction, windowFunction, accumulatorRowType, outRowType)
.name(nonKeyedAggOpName)
}
}
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 1e2df6e0f93..4a508551315 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -1037,7 +1037,7 @@ object AggregateUtil {
groupingKeys: Array[Int],
needMerge: Boolean,
tableConfig: TableConfig)
- : (DataStreamAggFunction[CRow, Row, Row], RowTypeInfo, RowTypeInfo) = {
+ : (DataStreamAggFunction[CRow, Row, Row], RowTypeInfo) = {
val needRetract = false
val (aggFields, aggregates, isDistinctAggs, accTypes, _) =
@@ -1068,13 +1068,10 @@ object AggregateUtil {
None
)
- val aggResultTypes = namedAggregates.map(a =>
FlinkTypeFactory.toTypeInfo(a.left.getType))
-
val accumulatorRowType = new RowTypeInfo(accTypes: _*)
- val aggResultRowType = new RowTypeInfo(aggResultTypes: _*)
val aggFunction = new AggregateAggFunction(genFunction)
- (aggFunction, accumulatorRowType, aggResultRowType)
+ (aggFunction, accumulatorRowType)
}
/**
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 3f935e34e56..3728844220f 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -473,7 +473,7 @@ public AllWindowedStream(DataStream<T> input,
}
return aggregate(function, new PassThroughAllWindowFunction<W,
R>(),
- accumulatorType, resultType, resultType);
+ accumulatorType, resultType);
}
/**
@@ -510,7 +510,7 @@ public AllWindowedStream(DataStream<T> input,
TypeInformation<R> resultType =
getAllWindowFunctionReturnType(windowFunction, aggResultType);
- return aggregate(aggFunction, windowFunction, accumulatorType,
aggResultType, resultType);
+ return aggregate(aggFunction, windowFunction, accumulatorType,
resultType);
}
private static <IN, OUT> TypeInformation<OUT>
getAllWindowFunctionReturnType(
@@ -566,13 +566,11 @@ public AllWindowedStream(DataStream<T> input,
AggregateFunction<T, ACC, V> aggregateFunction,
AllWindowFunction<V, R, W> windowFunction,
TypeInformation<ACC> accumulatorType,
- TypeInformation<V> aggregateResultType,
TypeInformation<R> resultType) {
checkNotNull(aggregateFunction, "aggregateFunction");
checkNotNull(windowFunction, "windowFunction");
checkNotNull(accumulatorType, "accumulatorType");
- checkNotNull(aggregateResultType, "aggregateResultType");
checkNotNull(resultType, "resultType");
if (aggregateFunction instanceof RichFunction) {
@@ -857,7 +855,7 @@ public AllWindowedStream(DataStream<T> input,
* @param resultType Type information for the result type of the window
function
* @return The data stream that is the result of applying the window
function to the window.
*
- * @deprecated use {@link #aggregate(AggregateFunction,
AllWindowFunction, TypeInformation, TypeInformation, TypeInformation)} instead
+ * @deprecated use {@link #aggregate(AggregateFunction,
AllWindowFunction, TypeInformation, TypeInformation)} instead
*/
@PublicEvolving
@Deprecated
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 4f8243ccc3a..24e1d1260f5 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -601,7 +601,7 @@ public WindowedStream(KeyedStream<T, K> input,
* @param windowResultType The process window function result type.
* @return The data stream that is the result of applying the fold
function to the window.
*
- * @deprecated use {@link #aggregate(AggregateFunction, WindowFunction,
TypeInformation, TypeInformation, TypeInformation)} instead
+ * @deprecated use {@link #aggregate(AggregateFunction, WindowFunction,
TypeInformation, TypeInformation)} instead
*/
@Deprecated
@Internal
@@ -728,7 +728,7 @@ public WindowedStream(KeyedStream<T, K> input,
}
return aggregate(function, new PassThroughWindowFunction<K, W,
R>(),
- accumulatorType, resultType, resultType);
+ accumulatorType, resultType);
}
/**
@@ -765,7 +765,7 @@ public WindowedStream(KeyedStream<T, K> input,
TypeInformation<R> resultType =
getWindowFunctionReturnType(windowFunction, aggResultType);
- return aggregate(aggFunction, windowFunction, accumulatorType,
aggResultType, resultType);
+ return aggregate(aggFunction, windowFunction, accumulatorType,
resultType);
}
/**
@@ -793,13 +793,11 @@ public WindowedStream(KeyedStream<T, K> input,
AggregateFunction<T, ACC, V> aggregateFunction,
WindowFunction<V, R, K, W> windowFunction,
TypeInformation<ACC> accumulatorType,
- TypeInformation<V> aggregateResultType,
TypeInformation<R> resultType) {
checkNotNull(aggregateFunction, "aggregateFunction");
checkNotNull(windowFunction, "windowFunction");
checkNotNull(accumulatorType, "accumulatorType");
- checkNotNull(aggregateResultType, "aggregateResultType");
checkNotNull(resultType, "resultType");
if (aggregateFunction instanceof RichFunction) {
diff --git
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index f21c0fb4840..f2373432b53 100644
---
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -318,12 +318,10 @@ class AllWindowedStream[T, W <: Window](javaStream:
JavaAllWStream[T, W]) {
val applyFunction = new ScalaAllWindowFunctionWrapper[V, R,
W](cleanedWindowFunction)
val accumulatorType: TypeInformation[ACC] =
implicitly[TypeInformation[ACC]]
- val aggregationResultType: TypeInformation[V] =
implicitly[TypeInformation[V]]
val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.aggregate(
- cleanedPreAggregator, applyFunction,
- accumulatorType, aggregationResultType, resultType))
+ cleanedPreAggregator, applyFunction, accumulatorType, resultType))
}
/**
@@ -384,12 +382,10 @@ class AllWindowedStream[T, W <: Window](javaStream:
JavaAllWStream[T, W]) {
val applyFunction = new ScalaAllWindowFunction[V, R,
W](cleanWindowFunction)
val accumulatorType: TypeInformation[ACC] =
implicitly[TypeInformation[ACC]]
- val aggregationResultType: TypeInformation[V] =
implicitly[TypeInformation[V]]
val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.aggregate(
- cleanPreAggregator, applyFunction,
- accumulatorType, aggregationResultType, resultType))
+ cleanPreAggregator, applyFunction, accumulatorType, resultType))
}
// ----------------------------- fold() -------------------------------------
diff --git
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 337e5ce036f..a440e29716b 100644
---
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -303,12 +303,10 @@ class WindowedStream[T, K, W <: Window](javaStream:
JavaWStream[T, K, W]) {
val applyFunction = new ScalaWindowFunctionWrapper[V, R, K,
W](cleanedWindowFunction)
val accumulatorType: TypeInformation[ACC] =
implicitly[TypeInformation[ACC]]
- val aggregationResultType: TypeInformation[V] =
implicitly[TypeInformation[V]]
val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.aggregate(
- cleanedPreAggregator, applyFunction,
- accumulatorType, aggregationResultType, resultType))
+ cleanedPreAggregator, applyFunction, accumulatorType, resultType))
}
/**
@@ -333,12 +331,10 @@ class WindowedStream[T, K, W <: Window](javaStream:
JavaWStream[T, K, W]) {
val applyFunction = new ScalaWindowFunction[V, R, K,
W](cleanedWindowFunction)
val accumulatorType: TypeInformation[ACC] =
implicitly[TypeInformation[ACC]]
- val aggregationResultType: TypeInformation[V] =
implicitly[TypeInformation[V]]
val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.aggregate(
- cleanedPreAggregator, applyFunction,
- accumulatorType, aggregationResultType, resultType))
+ cleanedPreAggregator, applyFunction, accumulatorType, resultType))
}
/**
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services