[
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423869#comment-16423869
]
ASF GitHub Bot commented on FLINK-8910:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5676#discussion_r178790739
--- Diff:
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+ public static void main(String[] args) throws Exception {
+
+ final ParameterTool pt = ParameterTool.fromArgs(args);
+
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ env.setParallelism(pt.getInt("parallelism", 1));
+ env.setMaxParallelism(pt.getInt("maxParallelism",
pt.getInt("parallelism", 1)));
+ env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
pt.getInt("restartDelay", 0)));
+ if (pt.has("externalizedCheckpoints") &&
pt.getBoolean("externalizedCheckpoints", false)) {
+
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ }
+
+ String stateBackend = pt.get("stateBackend", "file");
+ String checkpointDir = pt.getRequired("checkpointDir");
+
+ boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+ if ("file".equals(stateBackend)) {
+ boolean asyncCheckpoints =
pt.getBoolean("asyncCheckpoints", false);
+ env.setStateBackend(new FsStateBackend(checkpointDir,
asyncCheckpoints));
+ } else if ("rocks".equals(stateBackend)) {
+ boolean incrementalCheckpoints =
pt.getBoolean("incrementalCheckpoints", false);
+ env.setStateBackend(new
RocksDBStateBackend(checkpointDir, incrementalCheckpoints));
+ } else {
+ throw new IllegalArgumentException("Unknown backend: "
+ stateBackend);
+ }
+
+ // make parameters available in the web interface
+ env.getConfig().setGlobalJobParameters(pt);
+
+ // delay to throttle down the production of the source
+ long delay = pt.has("delay") ? pt.getLong("delay") : 0L;
+
+ // the maximum number of attempts, before the job finishes with
success
+ int maxAttempts = pt.has("maxAttempts") ?
pt.getInt("maxAttempts") : 3;
+
+ // size of one artificial value
+ int valueSize = pt.has("valueSize") ? pt.getInt("valueSize") :
10;
+
+ env.addSource(new RandomLongSource(maxAttempts, delay))
+ .keyBy((KeySelector<Long, Long>) aLong -> aLong)
+ .flatMap(new StateCreatingFlatMap(valueSize,
killJvmOnFail))
+ .map((MapFunction<String, String>) value -> value)
+ .addSink(new PrintSinkFunction<>());
+
+ env.execute("Sticky Allocation And Local Recovery Test");
+ }
+
+ /**
+ * Source function that produces a long sequence.
+ */
+ private static final class RandomLongSource extends
RichSourceFunction<Long> implements CheckpointedFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Generator delay between two events.
+ */
+ final long delay;
+
+ /**
+ * Maximum restarts before shutting down this source.
+ */
+ final int maxAttempts;
+
+ /**
+ * State that holds the current key for recovery.
+ */
+ ListState<Long> sourceCurrentKeyState;
+
+ /**
+ * Generator's current key.
+ */
+ long currentKey;
+
+ /**
+ * Generator runs while this is true.
+ */
+ volatile boolean running;
+
+ RandomLongSource(int maxAttempts, long delay) {
+ this.delay = delay;
+ this.maxAttempts = maxAttempts;
+ this.running = true;
+ }
+
+ @Override
+ public void run(SourceContext<Long> sourceContext) throws
Exception {
+
+ int numberOfParallelSubtasks =
getRuntimeContext().getNumberOfParallelSubtasks();
+ int subtaskIdx =
getRuntimeContext().getIndexOfThisSubtask();
+
+ // the source emits one final event and shuts down once
we have reached max attempts.
+ if (getRuntimeContext().getAttemptNumber() >
maxAttempts) {
+ sourceContext.collect(Long.MAX_VALUE -
subtaskIdx);
+ return;
+ }
+
+ while (running) {
+ sourceContext.collect(currentKey);
+ currentKey += numberOfParallelSubtasks;
+
+ if (delay > 0) {
+ Thread.sleep(delay);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context)
throws Exception {
+ sourceCurrentKeyState.clear();
+ sourceCurrentKeyState.add(currentKey);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext
context) throws Exception {
+
+ ListStateDescriptor<Long> currentKeyDescriptor = new
ListStateDescriptor<>("currentKey", Long.class);
+ sourceCurrentKeyState =
context.getOperatorStateStore().getListState(currentKeyDescriptor);
+
+ currentKey =
getRuntimeContext().getIndexOfThisSubtask();
+ Iterable<Long> iterable = sourceCurrentKeyState.get();
+ if (iterable != null) {
+ Iterator<Long> iterator = iterable.iterator();
+ if (iterator.hasNext()) {
+ currentKey = iterator.next();
+
Preconditions.checkState(!iterator.hasNext());
+ }
+ }
+ }
+ }
+
+ /**
+ * Stateful map function. Failure creation and checks happen here.
+ */
+ private static final class StateCreatingFlatMap
+ extends RichFlatMapFunction<Long, String> implements
CheckpointedFunction, CheckpointListener {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * User configured size of the generated artificial values in
the keyed state.
+ */
+ final int valueSize;
+
+ /**
+ * Holds the user configuration if the artificial test failure
is killing the JVM.
+ */
+ final boolean killTaskOnFailure;
+
+ /**
+ * This state is used to create artificial keyed state in the
backend.
+ */
+ transient ValueState<String> valueState;
+
+ /**
+ * This state is used to persist the schedulingAndFailureInfo
to state.
+ */
+ transient ListState<MapperSchedulingAndFailureInfo>
schedulingAndFailureState;
+
+ /**
+ * This contains the current scheduling and failure meta data.
+ */
+ transient MapperSchedulingAndFailureInfo
currentSchedulingAndFailureInfo;
+
+ /**
+ * Message to indicate that recovery detected a failure with
sticky allocation.
+ */
+ transient volatile String allocationFailureMessage;
+
+ /**
+ * If this flag is true, the next invocation of the map
function introduces a test failure.
+ */
+ transient volatile boolean failTask;
+
+ StateCreatingFlatMap(int valueSize, boolean killTaskOnFailure) {
+ this.valueSize = valueSize;
+ this.failTask = false;
+ this.killTaskOnFailure = killTaskOnFailure;
+ this.allocationFailureMessage = null;
+ }
+
+ @Override
+ public void flatMap(Long key, Collector<String> collector)
throws IOException {
+
+ if (allocationFailureMessage != null) {
+ // Report the failure downstream, so that we
can get the message from the output.
+ collector.collect(allocationFailureMessage);
+ allocationFailureMessage = null;
+ }
+
+ if (failTask) {
+ // we fail the task, either by killing the JVM
hard, or by throwing a user code exception.
+ if (killTaskOnFailure) {
+ Runtime.getRuntime().halt(-1);
+ } else {
+ throw new RuntimeException("Artificial
user code exception.");
+ }
+ }
+
+ // sanity check
+ if (null != valueState.value()) {
+ throw new IllegalStateException("This should
never happen, keys are generated monotonously.");
+ }
+
+ // store artificial data to blow up the state
+ valueState.update(RandomStringUtils.random(valueSize,
true, true));
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext
functionSnapshotContext) {
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext
functionInitializationContext) throws Exception {
+
+ ValueStateDescriptor<String> stateDescriptor =
+ new ValueStateDescriptor<>("state",
String.class);
+ valueState =
functionInitializationContext.getKeyedStateStore().getState(stateDescriptor);
+
+ ListStateDescriptor<MapperSchedulingAndFailureInfo>
mapperInfoStateDescriptor =
+ new ListStateDescriptor<>("mapperState",
MapperSchedulingAndFailureInfo.class);
+ schedulingAndFailureState =
+
functionInitializationContext.getOperatorStateStore().getUnionListState(mapperInfoStateDescriptor);
+
+ StreamingRuntimeContext runtimeContext =
(StreamingRuntimeContext) getRuntimeContext();
+ String allocationID = runtimeContext.getAllocationID();
+
+ final int thisJvmPid = getJvmPid();
+ final Set<Integer> killedJvmPids = new HashSet<>();
+
+ // here we check if the sticky scheduling worked as
expected
+ if (functionInitializationContext.isRestored()) {
+ Iterable<MapperSchedulingAndFailureInfo>
iterable = schedulingAndFailureState.get();
+ String taskNameWithSubtasks =
runtimeContext.getTaskNameWithSubtasks();
+
+ MapperSchedulingAndFailureInfo infoForThisTask
= null;
+ List<MapperSchedulingAndFailureInfo>
completeInfo = new ArrayList<>();
+ if (iterable != null) {
+ for (MapperSchedulingAndFailureInfo
testInfo : iterable) {
+
+ completeInfo.add(testInfo);
+
+ if
(taskNameWithSubtasks.equals(testInfo.taskNameWithSubtask)) {
+ infoForThisTask =
testInfo;
+ }
+
+ if (testInfo.killedJvm) {
+
killedJvmPids.add(testInfo.jvmPid);
+ }
+ }
+ }
+
+ Preconditions.checkNotNull(infoForThisTask,
"Expected to find info here.");
+
+ if
(!isScheduledToCorrectAllocation(infoForThisTask, allocationID, killedJvmPids))
{
+ allocationFailureMessage =
String.format(
+ "Sticky allocation test failed:
Subtask %s in attempt %d was rescheduled from allocation %s " +
+ "on JVM with PID %d to
unexpected allocation %s on JVM with PID %d.\n" +
+ "Complete information
from before the crash: %s.",
+
runtimeContext.getTaskNameWithSubtasks(),
+
runtimeContext.getAttemptNumber(),
+ infoForThisTask.allocationId,
+ infoForThisTask.jvmPid,
+ allocationID,
+ thisJvmPid,
+ completeInfo);
+ }
+ }
+
+ // We determine which of the subtasks will produce the
artificial failure
+ boolean failingTask = shouldTaskFailForThisAttempt();
+
+ // We take note of all the meta info that we require to
check sticky scheduling in the next re-attempt
+ this.currentSchedulingAndFailureInfo = new
MapperSchedulingAndFailureInfo(
+ failingTask,
+ failingTask && killTaskOnFailure,
+ thisJvmPid,
+ runtimeContext.getTaskNameWithSubtasks(),
+ allocationID);
+
+ schedulingAndFailureState.clear();
+
schedulingAndFailureState.add(currentSchedulingAndFailureInfo);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ // we can only fail the task after at least one
checkpoint is completed to record progress.
+ failTask = currentSchedulingAndFailureInfo.failingTask;
+ }
+
+ private boolean shouldTaskFailForThisAttempt() {
+ RuntimeContext runtimeContext = getRuntimeContext();
+ int numSubtasks =
runtimeContext.getNumberOfParallelSubtasks();
+ int subtaskIdx = runtimeContext.getIndexOfThisSubtask();
+ int attempt = runtimeContext.getAttemptNumber();
+ return (attempt % numSubtasks) == subtaskIdx;
+ }
+
+ private boolean isScheduledToCorrectAllocation(
+ MapperSchedulingAndFailureInfo infoForThisTask,
+ String allocationID,
+ Set<Integer> killedJvmPids) {
+
+ return
(infoForThisTask.allocationId.equals(allocationID)
+ ||
killedJvmPids.contains(infoForThisTask.jvmPid));
+ }
+ }
+
+ private static int getJvmPid() throws Exception {
+ java.lang.management.RuntimeMXBean runtime =
+
java.lang.management.ManagementFactory.getRuntimeMXBean();
+ java.lang.reflect.Field jvm =
runtime.getClass().getDeclaredField("jvm");
+ jvm.setAccessible(true);
+ sun.management.VMManagement mgmt =
+ (sun.management.VMManagement) jvm.get(runtime);
+ java.lang.reflect.Method pidMethod =
+ mgmt.getClass().getDeclaredMethod("getProcessId");
+ pidMethod.setAccessible(true);
--- End diff --
👍 added attributions. In case of license problems, I wonder how this would
work for such trivial code that is just calling three getters in an API?
> Introduce automated end-to-end test for local recovery (including sticky
> scheduling)
> ------------------------------------------------------------------------------------
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Affects Versions: 1.5.0
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Priority: Major
> Fix For: 1.5.0
>
>
> We should have an automated end-to-end test that can run nightly to check
> that sticky allocation and local recovery work as expected.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)