[
https://issues.apache.org/jira/browse/FLINK-7561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148762#comment-16148762
]
ASF GitHub Bot commented on FLINK-7561:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/4626#discussion_r136293381
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/PreAggregationOperator.java
---
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.aggregation;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This operator perform preliminary aggregation of the input values on
non-keyed stream. This means that the output
+ * is not fully aggregated, but only partially. It should be placed before
keyBy of the final aggregation. This make it
+ * useful in couple of scenarios:
+ * <ol>
+ * <li>Performing keyBy operation with low number of
distinct values in the key. In such case
+ * {@link PreAggregationOperator} can reduce both CPU
usage and network usage, by pre-aggregating most of the
+ * values before shuffling them over the network.</li>
+ * <li>Increasing the parallelism above the number of distinct
values for the task preceding the keyBy operation.</li>
+ * <li>Handling the data skew of some of the key values. Normally
if there is a data skew, a lot of work could be
+ * dumped onto one single CPU core in the cluster. With pre
aggregation some of that work can be performed in more
+ * distributed fashion by the {@link PreAggregationOperator}.</li>
+ * <li>Output partitioning of the data source is correlated with
keyBy partitioning. For example when data source
+ * is partitioned by day and keyBy function shuffles the data
based by day and hour.</li>
+ * </ol>
+ *
+ * <p>Because this operator performs only pre aggregation, it doesn't
output the result of {@link AggregateFunction}
+ * but rather it outputs a tuple containing the Key, Window, and
Accumulator, where Accumulator is a partially
+ * aggregated result {@link AggregateFunction}.
+ *
+ * <p>Keep in mind that {@link PreAggregationOperator} can have
significant higher memory consumption compared to
+ * normal aggregation. If the input data are either not partitioned or the
input partitioning is not correlated with
+ * the {@code keySelector}, each instance {@link PreAggregationOperator}
can end up having each own accumulators entry
+ * per each key. In other words in that case memory consumption is
expected to be {@code parallelism} times larger
+ * compared to what {@link
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator} would
have.
+ *
+ * <p>It is expected that this operator should be followed by keyBy
operation based on {@code tuple.f0} and after that
+ * followed by {@link
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator} which
perform
+ * {@link AggregateFunction#merge(ACC, ACC)}.
+ *
+ * <p>Because currently {@link PreAggregationOperator} does not use {@link
org.apache.flink.streaming.api.TimerService}
+ * only two elements triggering policies are supported:
+ * <ol>
+ * <li>Flush everything on any watermark.</li>
+ * <li>Iterate over each element in the state on each watermark and
emit it if watermark's timestamp exceeds
+ * {@link Window#maxTimestamp()}.</li>
+ * </ol>
+ * The first option has a drawback that it will often unnecessary emit
elements, that could potentially be further
+ * aggregated. The second one is quite CPU intensive if watermarks are
emitted relatively often to the number of pre
+ * aggregated key values.
+ *
+ * <p>Other limitations and notes:
+ * <ol>
+ * <li>{@link MergingWindowAssigner} is not supported.</li>
+ * <li>{@link PreAggregationOperator} emits all of its data on each
received watermark.</li>
+ * <li>On restoring from checkpoint keys can be randomly shuffled
between {@link PreAggregationOperator}
+ * instances</li>.
+ * </ol>
+ */
+@PublicEvolving
+public class PreAggregationOperator<K, IN, ACC, W extends Window>
+ extends AbstractStreamOperator<Tuple3<K, W, ACC>>
+ implements OneInputStreamOperator<IN, Tuple3<K, W, ACC>>, Serializable {
+
+ protected final AggregateFunction<IN, ACC, ?> aggregateFunction;
+ protected final KeySelector<IN, K> keySelector;
+ protected final WindowAssigner<? super IN, W> windowAssigner;
+ protected final TypeInformation<K> keyTypeInformation;
+ protected final TypeInformation<ACC> accumulatorTypeInformation;
+ protected final boolean flushAllOnWatermark;
+ protected final Map<Tuple2<K, W>, ACC> aggregates = new HashMap<>();
+
+ protected transient WindowAssigner.WindowAssignerContext
windowAssignerContext;
+ protected transient ListState<Tuple3<K, W, ACC>> aggregatesState;
+
+ /**
+ * Creates {@link PreAggregationOperator}.
+ *
+ * @param aggregateFunction function used for aggregation. Note, {@link
AggregateFunction#getResult(Object)} will
+ * not be used.
+ * @param keySelector
+ * @param keyTypeInformation
+ * @param accumulatorTypeInformation
+ * @param windowAssigner
+ * @param flushAllOnWatermark flag to control whether all elements
should be emitted on any watermark. Check more
+ * information in {@link
PreAggregationOperator}.
+ */
+ public PreAggregationOperator(
+ AggregateFunction<IN, ACC, ?> aggregateFunction,
+ KeySelector<IN, K> keySelector,
+ TypeInformation<K> keyTypeInformation,
+ TypeInformation<ACC> accumulatorTypeInformation,
+ WindowAssigner<? super IN, W> windowAssigner,
+ boolean flushAllOnWatermark) {
+ this.aggregateFunction = checkNotNull(aggregateFunction,
"aggregateFunction is null");
+ this.keySelector = checkNotNull(keySelector, "keySelector is
null");
+ this.windowAssigner = checkNotNull(windowAssigner,
"windowAssigner is null");
+ this.keyTypeInformation = checkNotNull(keyTypeInformation,
"keyTypeInformation is null");
+ this.accumulatorTypeInformation =
checkNotNull(accumulatorTypeInformation, "accumulatorTypeInformation is null");
+ this.flushAllOnWatermark = flushAllOnWatermark;
+
+ checkNotNull(keyTypeInformation, "keyTypeInformation is null");
+ checkNotNull(accumulatorTypeInformation,
"accumulatorTypeInformation is null");
+
+ checkArgument(!(windowAssigner instanceof
MergingWindowAssigner),
+ "MergingWindowAssigner is not supported by the
PreAggregationOperator");
+ }
+
+ @Override
+ public void open() throws Exception {
+ windowAssignerContext = new
WindowAssigner.WindowAssignerContext() {
+ @Override
+ public long getCurrentProcessingTime() {
+ return System.currentTimeMillis();
--- End diff --
This should use the `ProcessingTimeService` that can be retrieve via
`getProcessingTimeService()`. Then you can also test the operator with
processing time because the test harness allows advancing processing time
manually.
> Add support for pre-aggregation in DataStream API
> -------------------------------------------------
>
> Key: FLINK-7561
> URL: https://issues.apache.org/jira/browse/FLINK-7561
> Project: Flink
> Issue Type: New Feature
> Components: DataStream API
> Reporter: Piotr Nowojski
> Assignee: Piotr Nowojski
>
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)