[
https://issues.apache.org/jira/browse/FLINK-2283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14900585#comment-14900585
]
ASF GitHub Bot commented on FLINK-2283:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1155#discussion_r39965297
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
---
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import com.google.common.collect.EvictingQueue;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.GroupedDataStream;
+import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+
+import java.util.Queue;
+import java.util.Random;
+
+/**
+ * Integration test ensuring that the persistent state defined by the
implementations
+ * of {@link AbstractUdfStreamOperator} is correctly restored in case of
recovery from
+ * a failure.
+ *
+ * <p>
+ * The topology currently tests the proper behaviour of the {@link
StreamGroupedReduce}
+ * operator.
+ */
+@SuppressWarnings("serial")
+public class UdfStreamOperatorCheckpointingITCase extends
StreamFaultToleranceTestBase {
+
+ final private static long NUM_INPUT = 2_500_000L;
+ final private static int NUM_OUTPUT = 1_000;
+
+ /**
+ * Assembles a stream of a grouping field and some long data. Applies
reduce functions
+ * on this stream.
+ */
+ @Override
+ public void testProgram(StreamExecutionEnvironment env) {
+
+ // base stream
+ GroupedDataStream<Tuple2<Integer, Long>> stream =
env.addSource(new StatefulMultipleSequence())
+ .groupBy(0);
+
+
+ stream
+ // testing built-in aggregate
+ .min(1)
+ // failure generation
+ .map(new
OnceFailingIdentityMapFunction(NUM_INPUT))
+ .groupBy(0)
+ .addSink(new MinEvictingQueueSink());
+
+ stream
+ // testing UDF reducer
+ .reduce(new ReduceFunction<Tuple2<Integer,
Long>>() {
+ @Override
+ public Tuple2<Integer, Long> reduce(
+ Tuple2<Integer, Long>
value1, Tuple2<Integer, Long> value2) throws Exception {
+ return Tuple2.of(value1.f0,
value1.f1 + value2.f1);
+ }
+ })
+ .groupBy(0)
+ .addSink(new SumEvictingQueueSink());
+
+ stream
+ // testing UDF folder
+ .fold(Tuple2.of(0, 0L), new
FoldFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>>() {
+ @Override
+ public Tuple2<Integer, Long> fold(
+ Tuple2<Integer, Long>
accumulator, Tuple2<Integer, Long> value) throws Exception {
+ return Tuple2.of(value.f0,
accumulator.f1 + value.f1);
+ }
+ })
+ .groupBy(0)
+ .addSink(new FoldEvictingQueueSink());
--- End diff --
Formatting looks off on Github
> Make grouped reduce/fold/aggregations stateful using Partitioned state
> ----------------------------------------------------------------------
>
> Key: FLINK-2283
> URL: https://issues.apache.org/jira/browse/FLINK-2283
> Project: Flink
> Issue Type: Improvement
> Components: Streaming
> Affects Versions: 0.10
> Reporter: Gyula Fora
> Assignee: Márton Balassi
> Priority: Minor
>
> Currently the inner state of the grouped aggregations are not persisted as an
> operator state.
> These operators should be reimplemented to use the newly introduced
> partitioned state abstractions which will make them fault tolerant and
> scalable for the future.
> A suggested implementation would be to use a stateful mapper to implement the
> desired behaviour.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)