[FLINK-8971] [e2e-tests] Include broadcast / union state in general purpose 
DataStream job

This closes #5941.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef6e40f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef6e40f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef6e40f0

Branch: refs/heads/release-1.5
Commit: ef6e40f008e9a25eb2ebbe86ef256cd4bf254663
Parents: 0d5d086
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Mon Apr 30 18:05:46 2018 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue May 22 16:49:05 2018 +0800

----------------------------------------------------------------------
 .../tests/DataStreamAllroundTestJobFactory.java |   7 +
 .../tests/DataStreamAllroundTestProgram.java    |  28 +++-
 .../ArtificalOperatorStateMapper.java           | 159 +++++++++++++++++++
 .../test-scripts/test_resume_savepoint.sh       |  19 +++
 4 files changed, 205 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ef6e40f0/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index 2577460..c2e4cf5 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
 import org.apache.flink.streaming.api.windowing.time.Time;
+import 
org.apache.flink.streaming.tests.artificialstate.ArtificalOperatorStateMapper;
 import 
org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateMapper;
 import 
org.apache.flink.streaming.tests.artificialstate.builder.ArtificialListStateBuilder;
 import 
org.apache.flink.streaming.tests.artificialstate.builder.ArtificialStateBuilder;
@@ -285,6 +286,12 @@ class DataStreamAllroundTestJobFactory {
                return new ArtificialKeyedStateMapper<>(mapFunction, 
artificialStateBuilders);
        }
 
+       static <IN, OUT> ArtificalOperatorStateMapper<IN, OUT> 
createArtificialOperatorStateMapper(
+               MapFunction<IN, OUT> mapFunction) {
+
+               return new ArtificalOperatorStateMapper<>(mapFunction);
+       }
+
        static <IN, STATE> ArtificialStateBuilder<IN> createValueStateBuilder(
                JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
                TypeSerializer<STATE> typeSerializer) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ef6e40f0/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index 75c14e5..5ae1d16 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
 import org.apache.flink.streaming.tests.artificialstate.ComplexPayload;
@@ -29,6 +30,7 @@ import 
org.apache.flink.streaming.tests.artificialstate.ComplexPayload;
 import java.util.Collections;
 
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialOperatorStateMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
@@ -43,13 +45,17 @@ import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.
  *     <li>A generic Kryo input type.</li>
  *     <li>A state type for which we register a {@link KryoSerializer}.</li>
  *     <li>Operators with {@link ValueState}.</li>
+ *     <li>Operators with union state.</li>
+ *     <li>Operators with broadcast state.</li>
  * </ul>
  *
  * <p>The cli job configuration options are described in {@link 
DataStreamAllroundTestJobFactory}.
  *
  */
 public class DataStreamAllroundTestProgram {
-       private static final String STATE_OPER_NAME = 
"ArtificalKeyedStateMapper";
+       private static final String KEYED_STATE_OPER_NAME = 
"ArtificalKeyedStateMapper";
+       private static final String OPERATOR_STATE_OPER_NAME = 
"ArtificalOperatorStateMapper";
+       private static final String SEMANTICS_CHECK_MAPPER_NAME = 
"SemanticsCheckMapper";
 
        public static void main(String[] args) throws Exception {
                final ParameterTool pt = ParameterTool.fromArgs(args);
@@ -58,7 +64,7 @@ public class DataStreamAllroundTestProgram {
 
                setupEnvironment(env, pt);
 
-               env.addSource(createEventSource(pt))
+               DataStream<Event> eventStream = 
env.addSource(createEventSource(pt))
                        
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
                        .keyBy(Event::getKey)
                        .map(createArtificialKeyedStateMapper(
@@ -66,20 +72,26 @@ public class DataStreamAllroundTestProgram {
                                        (MapFunction<Event, Event>) in -> in,
                                        // state is verified and updated per 
event as a wrapped ComplexPayload state object
                                        (Event first, ComplexPayload second) -> 
{
-                                                       if (second != null && 
!second.getStrPayload().equals(STATE_OPER_NAME)) {
+                                                       if (second != null && 
!second.getStrPayload().equals(KEYED_STATE_OPER_NAME)) {
                                                                
System.out.println("State is set or restored incorrectly");
                                                        }
-                                                       return new 
ComplexPayload(first, STATE_OPER_NAME);
+                                                       return new 
ComplexPayload(first, KEYED_STATE_OPER_NAME);
                                                },
                                        Collections.singletonList(
                                                new 
KryoSerializer<>(ComplexPayload.class, env.getConfig()))
                                )
                        )
-                       .name(STATE_OPER_NAME)
-                       .returns(Event.class)
-                       .keyBy(Event::getKey)
+                       .name(KEYED_STATE_OPER_NAME)
+                       .returns(Event.class);
+
+               DataStream<Event> eventStream2 = eventStream
+                       
.map(createArtificialOperatorStateMapper((MapFunction<Event, Event>) in -> in))
+                       .name(OPERATOR_STATE_OPER_NAME)
+                       .returns(Event.class);
+
+               eventStream2.keyBy(Event::getKey)
                        .flatMap(createSemanticsCheckMapper(pt))
-                       .name("SemanticsCheckMapper")
+                       .name(SEMANTICS_CHECK_MAPPER_NAME)
                        .addSink(new PrintSinkFunction<>());
 
                env.execute("General purpose test job");

http://git-wip-us.apache.org/repos/asf/flink/blob/ef6e40f0/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificalOperatorStateMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificalOperatorStateMapper.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificalOperatorStateMapper.java
new file mode 100644
index 0000000..61501ea
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificalOperatorStateMapper.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A self-verifiable {@link RichMapFunction} used to verify checkpointing and 
restore semantics for various
+ * kinds of operator state.
+ *
+ * <p>For verifying broadcast state, the each subtask stores as broadcast 
state a map of (Integer, String) entries,
+ * key being the subtask index, and value being a String that corresponds to 
the subtask index. The total number
+ * of subtasks is also stored as broadcast state. On restore, each subtask 
should be restored with exactly the same
+ * broadcast state, with one entry for each subtask in the previous run.
+ *
+ * <p>For verifying union state, each subtask of this operator stores its own 
subtask index as a subset of the whole
+ * union state. On restore, each subtask's restored union state should have 
one entry for each subtask in the previous
+ * run.
+ *
+ * <p>All input elements to the operator arre simply passed through a 
user-provided map function and emitted.
+ */
+public class ArtificalOperatorStateMapper<IN, OUT> extends RichMapFunction<IN, 
OUT> implements CheckpointedFunction {
+
+       private static final long serialVersionUID = -1741298597425077761L;
+
+       // 
============================================================================
+       //  State names
+       // 
============================================================================
+
+       private static final String LAST_NUM_SUBTASKS_STATE_NAME = 
"lastNumSubtasksState";
+       private static final String BROADCAST_STATE_NAME = "broadcastState";
+       private static final String UNION_STATE_NAME = "unionState";
+
+       // 
============================================================================
+       //  Keys used in broadcast states
+       // 
============================================================================
+
+       private static final String LAST_NUM_SUBTASKS_STATE_KEY = 
"lastNumSubtasks";
+       private static final String BROADCAST_STATE_ENTRY_VALUE_PREFIX = 
"broadcastStateEntry-";
+
+       private final MapFunction<IN, OUT> mapFunction;
+
+       private transient BroadcastState<String, Integer> 
lastNumSubtasksBroadcastState;
+
+       private transient BroadcastState<Integer, String> 
broadcastElementsState;
+       private transient ListState<Integer> unionElementsState;
+
+       public ArtificalOperatorStateMapper(MapFunction<IN, OUT> mapFunction) {
+               this.mapFunction = Preconditions.checkNotNull(mapFunction);
+       }
+
+       @Override
+       public OUT map(IN value) throws Exception {
+               return mapFunction.map(value);
+       }
+
+       @Override
+       public void initializeState(FunctionInitializationContext context) 
throws Exception {
+               this.lastNumSubtasksBroadcastState = 
context.getOperatorStateStore()
+                       .getBroadcastState(new 
MapStateDescriptor<>(LAST_NUM_SUBTASKS_STATE_NAME, StringSerializer.INSTANCE, 
IntSerializer.INSTANCE));
+
+               this.broadcastElementsState = context.getOperatorStateStore()
+                       .getBroadcastState(new 
MapStateDescriptor<>(BROADCAST_STATE_NAME, IntSerializer.INSTANCE, 
StringSerializer.INSTANCE));
+
+               this.unionElementsState = context.getOperatorStateStore()
+                       .getUnionListState(new 
ListStateDescriptor<>(UNION_STATE_NAME, IntSerializer.INSTANCE));
+
+               if (context.isRestored()) {
+                       Integer lastNumSubtasks = 
lastNumSubtasksBroadcastState.get(LAST_NUM_SUBTASKS_STATE_KEY);
+                       Preconditions.checkState(lastNumSubtasks != null);
+
+                       // -- verification for broadcast state --
+                       Set<Integer> expected = new HashSet<>();
+                       for (int i = 0; i < lastNumSubtasks; i++) {
+                               expected.add(i);
+                       }
+
+                       for (Map.Entry<Integer, String> broadcastElementEntry : 
broadcastElementsState.entries()) {
+                               int key = broadcastElementEntry.getKey();
+                               Preconditions.checkState(expected.remove(key), 
"Unexpected keys in restored broadcast state.");
+                               
Preconditions.checkState(broadcastElementEntry.getValue().equals(getBroadcastStateEntryValue(key)),
 "Incorrect value in restored broadcast state.");
+                       }
+
+                       Preconditions.checkState(expected.size() == 0, "Missing 
keys in restored broadcast state.");
+
+                       // -- verification for union state --
+                       for (int i = 0; i < lastNumSubtasks; i++) {
+                               expected.add(i);
+                       }
+
+                       for (Integer subtaskIndex : unionElementsState.get()) {
+                               
Preconditions.checkState(expected.remove(subtaskIndex), "Unexpected element in 
restored union state.");
+                       }
+                       Preconditions.checkState(expected.size() == 0, "Missing 
elements in restored union state.");
+               } else {
+                       // verify that the broadcast / union state is actually 
empty if this is not a restored run, as told by the state context;
+                       // this catches incorrect logic with the 
context.isRestored() when using broadcast state / union state.
+
+                       
Preconditions.checkState(!lastNumSubtasksBroadcastState.iterator().hasNext());
+                       
Preconditions.checkState(!broadcastElementsState.iterator().hasNext());
+                       
Preconditions.checkState(!unionElementsState.get().iterator().hasNext());
+               }
+       }
+
+       @Override
+       public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+               final int numSubtasks = 
getRuntimeContext().getNumberOfParallelSubtasks();
+               final int thisSubtaskIndex = 
getRuntimeContext().getIndexOfThisSubtask();
+
+               // store total number of subtasks as broadcast state
+               lastNumSubtasksBroadcastState.clear();
+               lastNumSubtasksBroadcastState.put(LAST_NUM_SUBTASKS_STATE_KEY, 
numSubtasks);
+
+               // populate broadcast state (identical across all subtasks)
+               broadcastElementsState.clear();
+               for (int i = 0; i < numSubtasks; i++) {
+                       broadcastElementsState.put(i, 
getBroadcastStateEntryValue(i));
+               }
+
+               // each subtask only stores its own subtask index as a subset 
of the union set
+               unionElementsState.clear();
+               unionElementsState.add(thisSubtaskIndex);
+       }
+
+       private String getBroadcastStateEntryValue(int thisSubtaskIndex) {
+               return BROADCAST_STATE_ENTRY_VALUE_PREFIX + thisSubtaskIndex;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ef6e40f0/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh 
b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
index b19aa61..2060a87 100755
--- a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
+++ b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
@@ -17,6 +17,25 @@
 # limitations under the License.
 
################################################################################
 
+################################################################################
+# This end-to-end test verifies that manually taking a savepoint of a running
+# job and resuming from it works properly. It allows resuming the job with
+# a different parallelism than the original execution.
+#
+# Using the general purpose DataStream job, the test covers savepointing and
+# resuming when using different state backends (file, RocksDB), as well as the
+# following types of states:
+#  - Operator re-partitionable list state
+#  - Broadcast state
+#  - Union state
+#  - Keyed state (ValueState)
+#
+# The general purpose DataStream job is self-verifiable, such that if any
+# unexpected error occurs during savepoints or restores, exceptions will be
+# thrown; if exactly-once is violated, alerts will be sent to output (and
+# caught by the test script to fail the job).
+################################################################################
+
 if [ -z $1 ] || [ -z $2 ]; then
   echo "Usage: ./test_resume_savepoint.sh <original_dop> <new_dop> 
<state_backend_setting> <state_backend_file_async_setting>"
   exit 1

Reply via email to