[FLINK-8971] [e2e-tests] Include broadcast / union state in general purpose DataStream job
This closes #5941. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef6e40f0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef6e40f0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef6e40f0 Branch: refs/heads/release-1.5 Commit: ef6e40f008e9a25eb2ebbe86ef256cd4bf254663 Parents: 0d5d086 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Mon Apr 30 18:05:46 2018 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue May 22 16:49:05 2018 +0800 ---------------------------------------------------------------------- .../tests/DataStreamAllroundTestJobFactory.java | 7 + .../tests/DataStreamAllroundTestProgram.java | 28 +++- .../ArtificalOperatorStateMapper.java | 159 +++++++++++++++++++ .../test-scripts/test_resume_savepoint.sh | 19 +++ 4 files changed, 205 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ef6e40f0/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java index 2577460..c2e4cf5 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java @@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.tests.artificialstate.ArtificalOperatorStateMapper; import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateMapper; import org.apache.flink.streaming.tests.artificialstate.builder.ArtificialListStateBuilder; import org.apache.flink.streaming.tests.artificialstate.builder.ArtificialStateBuilder; @@ -285,6 +286,12 @@ class DataStreamAllroundTestJobFactory { return new ArtificialKeyedStateMapper<>(mapFunction, artificialStateBuilders); } + static <IN, OUT> ArtificalOperatorStateMapper<IN, OUT> createArtificialOperatorStateMapper( + MapFunction<IN, OUT> mapFunction) { + + return new ArtificalOperatorStateMapper<>(mapFunction); + } + static <IN, STATE> ArtificialStateBuilder<IN> createValueStateBuilder( JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState, TypeSerializer<STATE> typeSerializer) { http://git-wip-us.apache.org/repos/asf/flink/blob/ef6e40f0/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java index 75c14e5..5ae1d16 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import org.apache.flink.streaming.tests.artificialstate.ComplexPayload; @@ -29,6 +30,7 @@ import org.apache.flink.streaming.tests.artificialstate.ComplexPayload; import java.util.Collections; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper; +import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialOperatorStateMapper; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor; @@ -43,13 +45,17 @@ import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory. * <li>A generic Kryo input type.</li> * <li>A state type for which we register a {@link KryoSerializer}.</li> * <li>Operators with {@link ValueState}.</li> + * <li>Operators with union state.</li> + * <li>Operators with broadcast state.</li> * </ul> * * <p>The cli job configuration options are described in {@link DataStreamAllroundTestJobFactory}. * */ public class DataStreamAllroundTestProgram { - private static final String STATE_OPER_NAME = "ArtificalKeyedStateMapper"; + private static final String KEYED_STATE_OPER_NAME = "ArtificalKeyedStateMapper"; + private static final String OPERATOR_STATE_OPER_NAME = "ArtificalOperatorStateMapper"; + private static final String SEMANTICS_CHECK_MAPPER_NAME = "SemanticsCheckMapper"; public static void main(String[] args) throws Exception { final ParameterTool pt = ParameterTool.fromArgs(args); @@ -58,7 +64,7 @@ public class DataStreamAllroundTestProgram { setupEnvironment(env, pt); - env.addSource(createEventSource(pt)) + DataStream<Event> eventStream = env.addSource(createEventSource(pt)) .assignTimestampsAndWatermarks(createTimestampExtractor(pt)) .keyBy(Event::getKey) .map(createArtificialKeyedStateMapper( @@ -66,20 +72,26 @@ public class DataStreamAllroundTestProgram { (MapFunction<Event, Event>) in -> in, // state is verified and updated per event as a wrapped ComplexPayload state object (Event first, ComplexPayload second) -> { - if (second != null && !second.getStrPayload().equals(STATE_OPER_NAME)) { + if (second != null && !second.getStrPayload().equals(KEYED_STATE_OPER_NAME)) { System.out.println("State is set or restored incorrectly"); } - return new ComplexPayload(first, STATE_OPER_NAME); + return new ComplexPayload(first, KEYED_STATE_OPER_NAME); }, Collections.singletonList( new KryoSerializer<>(ComplexPayload.class, env.getConfig())) ) ) - .name(STATE_OPER_NAME) - .returns(Event.class) - .keyBy(Event::getKey) + .name(KEYED_STATE_OPER_NAME) + .returns(Event.class); + + DataStream<Event> eventStream2 = eventStream + .map(createArtificialOperatorStateMapper((MapFunction<Event, Event>) in -> in)) + .name(OPERATOR_STATE_OPER_NAME) + .returns(Event.class); + + eventStream2.keyBy(Event::getKey) .flatMap(createSemanticsCheckMapper(pt)) - .name("SemanticsCheckMapper") + .name(SEMANTICS_CHECK_MAPPER_NAME) .addSink(new PrintSinkFunction<>()); env.execute("General purpose test job"); http://git-wip-us.apache.org/repos/asf/flink/blob/ef6e40f0/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificalOperatorStateMapper.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificalOperatorStateMapper.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificalOperatorStateMapper.java new file mode 100644 index 0000000..61501ea --- /dev/null +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificalOperatorStateMapper.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.artificialstate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.util.Preconditions; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * A self-verifiable {@link RichMapFunction} used to verify checkpointing and restore semantics for various + * kinds of operator state. + * + * <p>For verifying broadcast state, the each subtask stores as broadcast state a map of (Integer, String) entries, + * key being the subtask index, and value being a String that corresponds to the subtask index. The total number + * of subtasks is also stored as broadcast state. On restore, each subtask should be restored with exactly the same + * broadcast state, with one entry for each subtask in the previous run. + * + * <p>For verifying union state, each subtask of this operator stores its own subtask index as a subset of the whole + * union state. On restore, each subtask's restored union state should have one entry for each subtask in the previous + * run. + * + * <p>All input elements to the operator arre simply passed through a user-provided map function and emitted. + */ +public class ArtificalOperatorStateMapper<IN, OUT> extends RichMapFunction<IN, OUT> implements CheckpointedFunction { + + private static final long serialVersionUID = -1741298597425077761L; + + // ============================================================================ + // State names + // ============================================================================ + + private static final String LAST_NUM_SUBTASKS_STATE_NAME = "lastNumSubtasksState"; + private static final String BROADCAST_STATE_NAME = "broadcastState"; + private static final String UNION_STATE_NAME = "unionState"; + + // ============================================================================ + // Keys used in broadcast states + // ============================================================================ + + private static final String LAST_NUM_SUBTASKS_STATE_KEY = "lastNumSubtasks"; + private static final String BROADCAST_STATE_ENTRY_VALUE_PREFIX = "broadcastStateEntry-"; + + private final MapFunction<IN, OUT> mapFunction; + + private transient BroadcastState<String, Integer> lastNumSubtasksBroadcastState; + + private transient BroadcastState<Integer, String> broadcastElementsState; + private transient ListState<Integer> unionElementsState; + + public ArtificalOperatorStateMapper(MapFunction<IN, OUT> mapFunction) { + this.mapFunction = Preconditions.checkNotNull(mapFunction); + } + + @Override + public OUT map(IN value) throws Exception { + return mapFunction.map(value); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + this.lastNumSubtasksBroadcastState = context.getOperatorStateStore() + .getBroadcastState(new MapStateDescriptor<>(LAST_NUM_SUBTASKS_STATE_NAME, StringSerializer.INSTANCE, IntSerializer.INSTANCE)); + + this.broadcastElementsState = context.getOperatorStateStore() + .getBroadcastState(new MapStateDescriptor<>(BROADCAST_STATE_NAME, IntSerializer.INSTANCE, StringSerializer.INSTANCE)); + + this.unionElementsState = context.getOperatorStateStore() + .getUnionListState(new ListStateDescriptor<>(UNION_STATE_NAME, IntSerializer.INSTANCE)); + + if (context.isRestored()) { + Integer lastNumSubtasks = lastNumSubtasksBroadcastState.get(LAST_NUM_SUBTASKS_STATE_KEY); + Preconditions.checkState(lastNumSubtasks != null); + + // -- verification for broadcast state -- + Set<Integer> expected = new HashSet<>(); + for (int i = 0; i < lastNumSubtasks; i++) { + expected.add(i); + } + + for (Map.Entry<Integer, String> broadcastElementEntry : broadcastElementsState.entries()) { + int key = broadcastElementEntry.getKey(); + Preconditions.checkState(expected.remove(key), "Unexpected keys in restored broadcast state."); + Preconditions.checkState(broadcastElementEntry.getValue().equals(getBroadcastStateEntryValue(key)), "Incorrect value in restored broadcast state."); + } + + Preconditions.checkState(expected.size() == 0, "Missing keys in restored broadcast state."); + + // -- verification for union state -- + for (int i = 0; i < lastNumSubtasks; i++) { + expected.add(i); + } + + for (Integer subtaskIndex : unionElementsState.get()) { + Preconditions.checkState(expected.remove(subtaskIndex), "Unexpected element in restored union state."); + } + Preconditions.checkState(expected.size() == 0, "Missing elements in restored union state."); + } else { + // verify that the broadcast / union state is actually empty if this is not a restored run, as told by the state context; + // this catches incorrect logic with the context.isRestored() when using broadcast state / union state. + + Preconditions.checkState(!lastNumSubtasksBroadcastState.iterator().hasNext()); + Preconditions.checkState(!broadcastElementsState.iterator().hasNext()); + Preconditions.checkState(!unionElementsState.get().iterator().hasNext()); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + final int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); + final int thisSubtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); + + // store total number of subtasks as broadcast state + lastNumSubtasksBroadcastState.clear(); + lastNumSubtasksBroadcastState.put(LAST_NUM_SUBTASKS_STATE_KEY, numSubtasks); + + // populate broadcast state (identical across all subtasks) + broadcastElementsState.clear(); + for (int i = 0; i < numSubtasks; i++) { + broadcastElementsState.put(i, getBroadcastStateEntryValue(i)); + } + + // each subtask only stores its own subtask index as a subset of the union set + unionElementsState.clear(); + unionElementsState.add(thisSubtaskIndex); + } + + private String getBroadcastStateEntryValue(int thisSubtaskIndex) { + return BROADCAST_STATE_ENTRY_VALUE_PREFIX + thisSubtaskIndex; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ef6e40f0/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh index b19aa61..2060a87 100755 --- a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh +++ b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh @@ -17,6 +17,25 @@ # limitations under the License. ################################################################################ +################################################################################ +# This end-to-end test verifies that manually taking a savepoint of a running +# job and resuming from it works properly. It allows resuming the job with +# a different parallelism than the original execution. +# +# Using the general purpose DataStream job, the test covers savepointing and +# resuming when using different state backends (file, RocksDB), as well as the +# following types of states: +# - Operator re-partitionable list state +# - Broadcast state +# - Union state +# - Keyed state (ValueState) +# +# The general purpose DataStream job is self-verifiable, such that if any +# unexpected error occurs during savepoints or restores, exceptions will be +# thrown; if exactly-once is violated, alerts will be sent to output (and +# caught by the test script to fail the job). +################################################################################ + if [ -z $1 ] || [ -z $2 ]; then echo "Usage: ./test_resume_savepoint.sh <original_dop> <new_dop> <state_backend_setting> <state_backend_file_async_setting>" exit 1
