This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bbf731869adeaf7d7183d6d7299a0d1406afd1bc
Author: Yun Tang <[email protected]>
AuthorDate: Sun May 17 21:30:30 2020 +0800

    [FLINK-8871][checkpoint][tests] Add ITcase for NotifiCheckpointAborted 
mechanism
---
 .../runtime/tasks/ExceptionallyDoneFuture.java     |   2 +-
 .../NotifyCheckpointAbortedITCase.java             | 449 +++++++++++++++++++++
 2 files changed, 450 insertions(+), 1 deletion(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/ExceptionallyDoneFuture.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/ExceptionallyDoneFuture.java
index 55bfc18..f95ec5c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/ExceptionallyDoneFuture.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/ExceptionallyDoneFuture.java
@@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit;
  *
  * @param <V> type of the RunnableFuture
  */
-class ExceptionallyDoneFuture<V> implements RunnableFuture<V> {
+public class ExceptionallyDoneFuture<V> implements RunnableFuture<V> {
 
        private final Throwable throwable;
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
new file mode 100644
index 0000000..e7e6e5c
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.state.AbstractSnapshotStrategy;
+import org.apache.flink.runtime.state.BackendBuildingException;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.tasks.ExceptionallyDoneFuture;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nonnull;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Integrated tests to verify the logic to notify checkpoint aborted via RPC 
message.
+ */
+@RunWith(Parameterized.class)
+public class NotifyCheckpointAbortedITCase extends TestLogger {
+       private static final long DECLINE_CHECKPOINT_ID = 2L;
+       private static final long TEST_TIMEOUT = 60000;
+       private static final String DECLINE_SINK_NAME = "DeclineSink";
+       private static MiniClusterWithClientResource cluster;
+
+       private static Path checkpointPath;
+
+       @Parameterized.Parameter
+       public boolean unalignedCheckpointEnabled;
+
+       @Parameterized.Parameters(name = "unalignedCheckpointEnabled ={0}")
+       public static Collection<Boolean> parameter() {
+               return Arrays.asList(true, false);
+       }
+
+       @ClassRule
+       public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+       @Before
+       public void setup() throws Exception {
+               Configuration configuration = new Configuration();
+               configuration.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, 
true);
+               configuration.setString(HighAvailabilityOptions.HA_MODE, 
TestingHAFactory.class.getName());
+
+               checkpointPath = new Path(TEMPORARY_FOLDER.newFolder().toURI());
+               cluster = new MiniClusterWithClientResource(
+                       new MiniClusterResourceConfiguration.Builder()
+                               .setConfiguration(configuration)
+                               .setNumberTaskManagers(1)
+                               .setNumberSlotsPerTaskManager(1).build());
+               cluster.before();
+
+               NormalMap.reset();
+               DeclineSink.reset();
+               TestingCompletedCheckpointStore.reset();
+       }
+
+       @After
+       public void shutdown() {
+               if (cluster != null) {
+                       cluster.after();
+                       cluster = null;
+               }
+
+       }
+
+       /**
+        * Verify operators would be notified as checkpoint aborted.
+        *
+        * <p>The job would run with at least two checkpoints. The 1st 
checkpoint would fail due to add checkpoint to store,
+        * and the 2nd checkpoint would decline by async checkpoint phase of 
'DeclineSink'.
+        *
+        * <p>The job graph looks like:
+        * NormalSource --> keyBy --> NormalMap --> DeclineSink
+        */
+       @Test(timeout = TEST_TIMEOUT)
+       public void testNotifyCheckpointAborted() throws Exception {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.enableCheckpointing(200, CheckpointingMode.EXACTLY_ONCE);
+               
env.getCheckpointConfig().enableUnalignedCheckpoints(unalignedCheckpointEnabled);
+               
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
+               env.disableOperatorChaining();
+               env.setParallelism(1);
+
+               final StateBackend failingStateBackend = new 
DeclineSinkFailingStateBackend(checkpointPath);
+               env.setStateBackend(failingStateBackend);
+
+               env.addSource(new NormalSource()).name("NormalSource")
+                       .keyBy((KeySelector<Tuple2<Integer, Integer>, Integer>) 
value -> value.f0)
+                       .transform("NormalMap", 
TypeInformation.of(Integer.class), new NormalMap())
+                       .transform(DECLINE_SINK_NAME, 
TypeInformation.of(Object.class), new DeclineSink());
+
+               final ClusterClient<?> clusterClient = 
cluster.getClusterClient();
+               JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+               JobID jobID = jobGraph.getJobID();
+
+               ClientUtils.submitJob(clusterClient, jobGraph);
+
+               TestingCompletedCheckpointStore.addCheckpointLatch.await();
+               TestingCompletedCheckpointStore.abortCheckpointLatch.trigger();
+
+               verifyAllOperatorsNotifyAborted();
+               resetAllOperatorsNotifyAbortedLatches();
+               verifyAllOperatorsNotifyAbortedTimes(1);
+
+               DeclineSink.waitLatch.trigger();
+               verifyAllOperatorsNotifyAborted();
+               verifyAllOperatorsNotifyAbortedTimes(2);
+
+               clusterClient.cancel(jobID).get();
+       }
+
+       private void verifyAllOperatorsNotifyAborted() throws 
InterruptedException {
+               NormalMap.notifiedAbortedLatch.await();
+               DeclineSink.notifiedAbortedLatch.await();
+       }
+
+       private void resetAllOperatorsNotifyAbortedLatches() {
+               NormalMap.notifiedAbortedLatch.reset();
+               DeclineSink.notifiedAbortedLatch.reset();
+       }
+
+       private void verifyAllOperatorsNotifyAbortedTimes(int expectedTimes) {
+               assertEquals(expectedTimes, 
NormalMap.notifiedAbortedTimes.get());
+               assertEquals(expectedTimes, 
DeclineSink.notifiedAbortedTimes.get());
+       }
+
+       /**
+        * Normal source function.
+        */
+       private static class NormalSource implements 
SourceFunction<Tuple2<Integer, Integer>> {
+               private static final long serialVersionUID = 1L;
+               protected volatile boolean running;
+
+               NormalSource() {
+                       this.running = true;
+               }
+
+               @Override
+               public void run(SourceContext<Tuple2<Integer, Integer>> ctx) 
throws Exception {
+                       while (running) {
+                               synchronized (ctx.getCheckpointLock()) {
+                                       
ctx.collect(Tuple2.of(ThreadLocalRandom.current().nextInt(), 
ThreadLocalRandom.current().nextInt()));
+                               }
+                               Thread.sleep(10);
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       this.running = false;
+               }
+       }
+
+       private static class NormalMap extends StreamMap<Tuple2<Integer, 
Integer>, Integer> {
+               private static final long serialVersionUID = 1L;
+               private static final OneShotLatch notifiedAbortedLatch = new 
OneShotLatch();
+               private static final AtomicInteger notifiedAbortedTimes = new 
AtomicInteger(0);
+
+               public NormalMap() {
+                       super(new NormalMapFunction());
+               }
+
+               @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+                       notifiedAbortedTimes.incrementAndGet();
+                       notifiedAbortedLatch.trigger();
+               }
+
+               static void reset() {
+                       notifiedAbortedLatch.reset();
+                       notifiedAbortedTimes.set(0);
+               }
+       }
+
+       /**
+        * Normal map function.
+        */
+       private static class NormalMapFunction implements 
MapFunction<Tuple2<Integer, Integer>, Integer>, CheckpointedFunction {
+               private static final long serialVersionUID = 1L;
+               private ValueState<Integer> valueState;
+
+               @Override
+               public Integer map(Tuple2<Integer, Integer> value) throws 
Exception {
+                       valueState.update(value.f1);
+                       return value.f1;
+               }
+
+               @Override
+               public void snapshotState(FunctionSnapshotContext context) {
+               }
+
+               @Override
+               public void initializeState(FunctionInitializationContext 
context) throws Exception {
+                       valueState = context.getKeyedStateStore().getState(new 
ValueStateDescriptor<>("value", Integer.class));
+               }
+       }
+
+       /**
+        * A decline sink.
+        */
+       private static class DeclineSink extends StreamSink<Integer> {
+               private static final long serialVersionUID = 1L;
+               private static final OneShotLatch notifiedAbortedLatch = new 
OneShotLatch();
+               private static final OneShotLatch waitLatch = new 
OneShotLatch();
+               private static final AtomicInteger notifiedAbortedTimes = new 
AtomicInteger(0);
+
+               public DeclineSink() {
+                       super(new SinkFunction<Integer>() {
+                               private static final long serialVersionUID = 1L;
+                       });
+               }
+
+               @Override
+               public void snapshotState(StateSnapshotContext context) throws 
Exception {
+                       if (context.getCheckpointId() == DECLINE_CHECKPOINT_ID) 
{
+                               DeclineSink.waitLatch.await();
+                       }
+                       super.snapshotState(context);
+               }
+
+               @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+                       notifiedAbortedTimes.incrementAndGet();
+                       notifiedAbortedLatch.trigger();
+               }
+
+               static void reset() {
+                       notifiedAbortedLatch.reset();
+                       waitLatch.reset();
+                       notifiedAbortedTimes.set(0);
+               }
+
+       }
+
+       /**
+        * The snapshot strategy to create failing runnable future at the 
checkpoint to decline.
+        */
+       private static class DeclineSinkFailingSnapshotStrategy extends 
AbstractSnapshotStrategy<OperatorStateHandle> {
+
+               protected DeclineSinkFailingSnapshotStrategy() {
+                       super("StuckAsyncSnapshotStrategy");
+               }
+
+               @Override
+               public RunnableFuture<SnapshotResult<OperatorStateHandle>> 
snapshot(
+                       long checkpointId, long timestamp, @Nonnull 
CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions 
checkpointOptions) {
+                       if (checkpointId == DECLINE_CHECKPOINT_ID) {
+                               return ExceptionallyDoneFuture.of(new 
ExpectedTestException());
+                       } else {
+                               return DoneFuture.of(SnapshotResult.empty());
+                       }
+               }
+       }
+
+       /**
+        * The operator statebackend to create {@link 
DeclineSinkFailingSnapshotStrategy} at {@link DeclineSink}.
+        */
+       private static class DeclineSinkFailingOperatorStateBackend extends 
DefaultOperatorStateBackend {
+
+               public DeclineSinkFailingOperatorStateBackend(
+                       ExecutionConfig executionConfig,
+                       CloseableRegistry closeStreamOnCancelRegistry,
+                       AbstractSnapshotStrategy<OperatorStateHandle> 
snapshotStrategy) {
+                       super(executionConfig,
+                               closeStreamOnCancelRegistry,
+                               new HashMap<>(),
+                               new HashMap<>(),
+                               new HashMap<>(),
+                               new HashMap<>(),
+                               snapshotStrategy);
+               }
+       }
+
+       /**
+        * The state backend to create {@link 
DeclineSinkFailingOperatorStateBackend} at {@link DeclineSink}.
+        */
+       private static class DeclineSinkFailingStateBackend extends 
FsStateBackend {
+               private static final long serialVersionUID = 1L;
+
+               public DeclineSinkFailingStateBackend(Path checkpointDataUri) {
+                       super(checkpointDataUri);
+               }
+
+               @Override
+               public DeclineSinkFailingStateBackend configure(ReadableConfig 
config, ClassLoader classLoader) {
+                       return new 
DeclineSinkFailingStateBackend(checkpointPath);
+               }
+
+               @Override
+               public OperatorStateBackend createOperatorStateBackend(
+                       Environment env,
+                       String operatorIdentifier,
+                       @Nonnull Collection<OperatorStateHandle> stateHandles,
+                       CloseableRegistry cancelStreamRegistry) throws 
BackendBuildingException {
+                       if (operatorIdentifier.contains(DECLINE_SINK_NAME)) {
+                               return new 
DeclineSinkFailingOperatorStateBackend(
+                                       env.getExecutionConfig(),
+                                       cancelStreamRegistry,
+                                       new 
DeclineSinkFailingSnapshotStrategy());
+                       } else {
+                               return new DefaultOperatorStateBackendBuilder(
+                                       env.getUserClassLoader(),
+                                       env.getExecutionConfig(),
+                                       false,
+                                       stateHandles,
+                                       cancelStreamRegistry).build();
+                       }
+               }
+       }
+
+       private static class TestingHaServices extends EmbeddedHaServices {
+               private final CheckpointRecoveryFactory 
checkpointRecoveryFactory;
+
+               TestingHaServices(CheckpointRecoveryFactory 
checkpointRecoveryFactory, Executor executor) {
+                       super(executor);
+                       this.checkpointRecoveryFactory = 
checkpointRecoveryFactory;
+               }
+
+               @Override
+               public CheckpointRecoveryFactory getCheckpointRecoveryFactory() 
{
+                       return checkpointRecoveryFactory;
+               }
+       }
+
+       /**
+        * An extension of {@link StandaloneCompletedCheckpointStore}.
+        */
+       private static class TestingCompletedCheckpointStore extends 
StandaloneCompletedCheckpointStore {
+               private static final OneShotLatch addCheckpointLatch = new 
OneShotLatch();
+               private static final OneShotLatch abortCheckpointLatch = new 
OneShotLatch();
+
+               TestingCompletedCheckpointStore() {
+                       super(1);
+               }
+
+               @Override
+               public void addCheckpoint(CompletedCheckpoint checkpoint) 
throws Exception {
+                       if (abortCheckpointLatch.isTriggered()) {
+                               super.addCheckpoint(checkpoint);
+                       } else {
+                               // tell main thread that all checkpoints on 
task side have been finished.
+                               addCheckpointLatch.trigger();
+                               // wait for the main thread to throw exception 
so that the checkpoint would be notified as aborted.
+                               abortCheckpointLatch.await();
+                               throw new ExpectedTestException();
+                       }
+               }
+
+               static void reset() {
+                       addCheckpointLatch.reset();
+                       abortCheckpointLatch.reset();
+               }
+       }
+
+       /**
+        * Testing HA factory which needs to be public in order to be 
instantiatable.
+        */
+       public static class TestingHAFactory implements 
HighAvailabilityServicesFactory {
+
+               @Override
+               public HighAvailabilityServices createHAServices(Configuration 
configuration, Executor executor) {
+                       return new TestingHaServices(
+                               new TestingCheckpointRecoveryFactory(new 
TestingCompletedCheckpointStore(), new StandaloneCheckpointIDCounter()),
+                               executor);
+               }
+       }
+
+}

Reply via email to