This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1c653ceb25b456a0abe65b22b2eada17ba2bed53 Author: Gary Yao <[email protected]> AuthorDate: Tue Jul 9 15:21:59 2019 +0200 [FLINK-13145][runtime, tests] Enable fine grained failover in E2E HA data set test - Introduce new test job (DataSetFineGrainedRecoveryTestProgram), which waits for an external file to be created before finishing. - Introduce killing of TMs to HA data set test. - Reduce JM kills to 2. - Reduce heartbeat interval and timeout to speed up TM loss detection. This closes #9060. --- .../pom.xml | 89 ++++++++++++++ .../tests/BlockingIncrementingMapFunction.java | 60 ++++++++++ .../DataSetFineGrainedRecoveryTestProgram.java | 48 ++++++++ .../batch/tests/util/FileBasedOneShotLatch.java | 129 +++++++++++++++++++++ .../tests/util/FileBasedOneShotLatchTest.java | 92 +++++++++++++++ flink-end-to-end-tests/pom.xml | 1 + .../test-scripts/test_ha_dataset.sh | 59 +++++++--- 7 files changed, 463 insertions(+), 15 deletions(-) diff --git a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/pom.xml b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/pom.xml new file mode 100644 index 0000000..ecde754 --- /dev/null +++ b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/pom.xml @@ -0,0 +1,89 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-end-to-end-tests</artifactId> + <version>1.10-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-dataset-fine-grained-recovery-test</artifactId> + <name>flink-dataset-fine-grained-recovery-test</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <executions> + <execution> + <id>dataset-fine-grained-recovery</id> + <phase>test</phase> + <goals> + <goal>test</goal> + </goals> + <configuration> + <includes> + <include>**/*Test.*</include> + </includes> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>DataSetFineGrainedRecoveryTestProgram</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <finalName>DataSetFineGrainedRecoveryTestProgram</finalName> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.flink.batch.tests.DataSetFineGrainedRecoveryTestProgram</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/BlockingIncrementingMapFunction.java b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/BlockingIncrementingMapFunction.java new file mode 100644 index 0000000..c5d92b8 --- /dev/null +++ b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/BlockingIncrementingMapFunction.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.batch.tests; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.batch.tests.util.FileBasedOneShotLatch; +import org.apache.flink.configuration.Configuration; + +import java.nio.file.Paths; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A map function that increments values by one. + * + * <p>Processing of elements is held until a latch file is created. + */ +public class BlockingIncrementingMapFunction extends RichMapFunction<Long, Long> { + + private final String latchFilePath; + + private transient FileBasedOneShotLatch latch; + + public BlockingIncrementingMapFunction(final String latchFilePath) { + this.latchFilePath = checkNotNull(latchFilePath); + } + + @Override + public void open(final Configuration parameters) { + latch = new FileBasedOneShotLatch(Paths.get(latchFilePath)); + } + + @Override + public void close() throws Exception { + latch.close(); + } + + @Override + public Long map(final Long value) throws InterruptedException { + latch.await(); + return value + 1; + } +} diff --git a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/DataSetFineGrainedRecoveryTestProgram.java b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/DataSetFineGrainedRecoveryTestProgram.java new file mode 100644 index 0000000..749d232 --- /dev/null +++ b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/DataSetFineGrainedRecoveryTestProgram.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.batch.tests; + +import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.core.fs.FileSystem; + +/** + * Program to test fine grained recovery. + */ +public class DataSetFineGrainedRecoveryTestProgram { + + public static void main(String[] args) throws Exception { + final ParameterTool params = ParameterTool.fromArgs(args); + final String latchFilePath = params.getRequired("latchFilePath"); + final String outputPath = params.getRequired("outputPath"); + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED); + env.setParallelism(4); + + env.generateSequence(0, 1000) + .map(new BlockingIncrementingMapFunction(latchFilePath)) + .writeAsText(outputPath, FileSystem.WriteMode.OVERWRITE) + .setParallelism(1); + + env.execute(); + } +} diff --git a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatch.java b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatch.java new file mode 100644 index 0000000..dee2414 --- /dev/null +++ b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatch.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.batch.tests.util; + +import com.sun.nio.file.SensitivityWatchEventModifier; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A synchronization aid that allows a single thread to wait on the creation of a specified file. + */ +@NotThreadSafe +public class FileBasedOneShotLatch implements Closeable { + + private final Path latchFile; + + private final WatchService watchService; + + private boolean released; + + public FileBasedOneShotLatch(final Path latchFile) { + this.latchFile = checkNotNull(latchFile); + + final Path parentDir = checkNotNull(latchFile.getParent(), "latchFile must have a parent"); + this.watchService = initWatchService(parentDir); + } + + private static WatchService initWatchService(final Path parentDir) { + final WatchService watchService = createWatchService(parentDir); + watchForLatchFile(watchService, parentDir); + return watchService; + } + + private static WatchService createWatchService(final Path parentDir) { + try { + return parentDir.getFileSystem().newWatchService(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static void watchForLatchFile(final WatchService watchService, final Path parentDir) { + try { + parentDir.register( + watchService, + new WatchEvent.Kind[]{StandardWatchEventKinds.ENTRY_CREATE}, + SensitivityWatchEventModifier.HIGH); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Waits until the latch file is created. + * + * <p>When this method returns, subsequent invocations will not block even after the latch file + * is deleted. Note that this method may not return if the latch file is deleted before this + * method returns. + * + * @throws InterruptedException if interrupted while waiting + */ + public void await() throws InterruptedException { + if (isReleasedOrReleasable()) { + return; + } + + awaitLatchFile(watchService); + } + + private void awaitLatchFile(final WatchService watchService) throws InterruptedException { + while (true) { + WatchKey watchKey = watchService.take(); + if (isReleasedOrReleasable()) { + break; + } + watchKey.reset(); + } + } + + private boolean isReleasedOrReleasable() { + if (released) { + return true; + } + + if (Files.exists(latchFile)) { + releaseLatch(); + return true; + } + + return false; + } + + private void releaseLatch() { + released = true; + } + + @Override + public void close() throws IOException { + watchService.close(); + } +} diff --git a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/test/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatchTest.java b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/test/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatchTest.java new file mode 100644 index 0000000..42d5cff --- /dev/null +++ b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/test/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatchTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.batch.tests.util; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link FileBasedOneShotLatch}. + */ +public class FileBasedOneShotLatchTest { + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private FileBasedOneShotLatch latch; + + private File latchFile; + + @Before + public void setUp() { + latchFile = new File(temporaryFolder.getRoot(), "latchFile"); + latch = new FileBasedOneShotLatch(latchFile.toPath()); + } + + @Test + public void awaitReturnsWhenFileIsCreated() throws Exception { + final AtomicBoolean awaitCompleted = new AtomicBoolean(); + final Thread thread = new Thread(() -> { + try { + latch.await(); + awaitCompleted.set(true); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + thread.start(); + + latchFile.createNewFile(); + thread.join(); + + assertTrue(awaitCompleted.get()); + } + + @Test + public void subsequentAwaitDoesNotBlock() throws Exception { + latchFile.createNewFile(); + latch.await(); + latch.await(); + } + + @Test + public void subsequentAwaitDoesNotBlockEvenIfLatchFileIsDeleted() throws Exception { + latchFile.createNewFile(); + latch.await(); + + latchFile.delete(); + latch.await(); + } + + @Test + public void doesNotBlockIfFileExistsPriorToCreatingLatch() throws Exception { + latchFile.createNewFile(); + + final FileBasedOneShotLatch latch = new FileBasedOneShotLatch(latchFile.toPath()); + latch.await(); + } +} diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index 46bfde2..d676291 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -39,6 +39,7 @@ under the License. <module>flink-parent-child-classloading-test-program</module> <module>flink-parent-child-classloading-test-lib-package</module> <module>flink-dataset-allround-test</module> + <module>flink-dataset-fine-grained-recovery-test</module> <module>flink-datastream-allround-test</module> <module>flink-stream-sql-test</module> <module>flink-bucketing-sink-test</module> diff --git a/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh b/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh index b619e16..b547142 100755 --- a/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh +++ b/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh @@ -20,7 +20,7 @@ source "$(dirname "$0")"/common.sh source "$(dirname "$0")"/common_ha.sh -TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-fine-grained-recovery-test/target/DataSetFineGrainedRecoveryTestProgram.jar function ha_cleanup() { # kill the cluster and zookeeper @@ -32,30 +32,26 @@ on_exit ha_cleanup function run_ha_test() { local PARALLELISM=$1 - local JM_KILLS=3 + local JM_KILLS=2 + local TM_KILLS=2 - CLEARED=0 - mkdir -p ${TEST_DATA_DIR}/control - touch ${TEST_DATA_DIR}/control/test.txt + local LATCH_FILE_PATH=$TEST_DATA_DIR/latchFile - # start the cluster on HA mode - start_ha_cluster + CLEARED=0 + setup_and_start_cluster ${PARALLELISM} echo "Running on HA mode: parallelism=${PARALLELISM}." # submit a job in detached mode and let it run local JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \ $TEST_PROGRAM_JAR \ - --loadFactor 4 \ + --latchFilePath $LATCH_FILE_PATH \ --outputPath $TEST_DATA_DIR/out/dataset_allround \ - --source true \ | grep "Job has been submitted with JobID" | sed 's/.* //g') wait_job_running ${JOB_ID} - # start the watchdog that keeps the number of JMs stable - start_ha_jm_watchdog 1 "StandaloneSessionClusterEntrypoint" start_jm_cmd "8081" - + local c for (( c=0; c<${JM_KILLS}; c++ )); do # kill the JM and wait for watchdog to # create a new one which will take over @@ -63,10 +59,43 @@ function run_ha_test() { wait_job_running ${JOB_ID} done - cancel_job ${JOB_ID} + for (( c=0; c<${TM_KILLS}; c++ )); do + kill_and_replace_random_task_manager + done + + touch ${LATCH_FILE_PATH} + + wait_job_terminal_state ${JOB_ID} "FINISHED" + check_result_hash "DataSet-FineGrainedRecovery-Test" $TEST_DATA_DIR/out/dataset_allround "ac3d26e1afce19aa657527f000acb88b" +} + +function setup_and_start_cluster() { + local NUM_TASK_MANAGERS=$1 + + create_ha_config + + set_config_key "jobmanager.execution.failover-strategy" "region" + set_config_key "jobmanager.scheduler.partition.force-release-on-consumption" "false" + set_config_key "taskmanager.numberOfTaskSlots" "1" + + set_config_key "restart-strategy" "fixed-delay" + set_config_key "restart-strategy.fixed-delay.attempts" "2147483647" + + set_config_key "heartbeat.interval" "2000" + set_config_key "heartbeat.timeout" "10000" + + start_local_zk + start_ha_jm_watchdog 1 "StandaloneSessionClusterEntrypoint" start_jm_cmd "8081" + start_taskmanagers ${NUM_TASK_MANAGERS} +} + +function kill_and_replace_random_task_manager() { + local NUM_TASK_MANAGERS=$(query_number_of_running_tms) - # do not verify checkpoints in the logs - verify_logs ${JM_KILLS} false + kill_random_taskmanager + wait_for_number_of_running_tms $(( ${NUM_TASK_MANAGERS} - 1 )) + start_taskmanagers 1 + wait_for_number_of_running_tms ${NUM_TASK_MANAGERS} } run_ha_test 4
