This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1c653ceb25b456a0abe65b22b2eada17ba2bed53
Author: Gary Yao <[email protected]>
AuthorDate: Tue Jul 9 15:21:59 2019 +0200

    [FLINK-13145][runtime, tests] Enable fine grained failover in E2E HA data 
set test
    
    - Introduce new test job (DataSetFineGrainedRecoveryTestProgram), which 
waits
      for an  external file to be created before finishing.
    - Introduce killing of TMs to HA data set test.
    - Reduce JM kills to 2.
    - Reduce heartbeat interval and timeout to speed up TM loss detection.
    
    This closes #9060.
---
 .../pom.xml                                        |  89 ++++++++++++++
 .../tests/BlockingIncrementingMapFunction.java     |  60 ++++++++++
 .../DataSetFineGrainedRecoveryTestProgram.java     |  48 ++++++++
 .../batch/tests/util/FileBasedOneShotLatch.java    | 129 +++++++++++++++++++++
 .../tests/util/FileBasedOneShotLatchTest.java      |  92 +++++++++++++++
 flink-end-to-end-tests/pom.xml                     |   1 +
 .../test-scripts/test_ha_dataset.sh                |  59 +++++++---
 7 files changed, 463 insertions(+), 15 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/pom.xml 
b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/pom.xml
new file mode 100644
index 0000000..ecde754
--- /dev/null
+++ b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-end-to-end-tests</artifactId>
+               <version>1.10-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-dataset-fine-grained-recovery-test</artifactId>
+       <name>flink-dataset-fine-grained-recovery-test</name>
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-java</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               
<id>dataset-fine-grained-recovery</id>
+                                               <phase>test</phase>
+                                               <goals>
+                                                       <goal>test</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <includes>
+                                                               
<include>**/*Test.*</include>
+                                                       </includes>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               
<id>DataSetFineGrainedRecoveryTestProgram</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<finalName>DataSetFineGrainedRecoveryTestProgram</finalName>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                                                       
<mainClass>org.apache.flink.batch.tests.DataSetFineGrainedRecoveryTestProgram</mainClass>
+                                                               </transformer>
+                                                       </transformers>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>
diff --git 
a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/BlockingIncrementingMapFunction.java
 
b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/BlockingIncrementingMapFunction.java
new file mode 100644
index 0000000..c5d92b8
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/BlockingIncrementingMapFunction.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.batch.tests;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.batch.tests.util.FileBasedOneShotLatch;
+import org.apache.flink.configuration.Configuration;
+
+import java.nio.file.Paths;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A map function that increments values by one.
+ *
+ * <p>Processing of elements is held until a latch file is created.
+ */
+public class BlockingIncrementingMapFunction extends RichMapFunction<Long, 
Long> {
+
+       private final String latchFilePath;
+
+       private transient FileBasedOneShotLatch latch;
+
+       public BlockingIncrementingMapFunction(final String latchFilePath) {
+               this.latchFilePath = checkNotNull(latchFilePath);
+       }
+
+       @Override
+       public void open(final Configuration parameters) {
+               latch = new FileBasedOneShotLatch(Paths.get(latchFilePath));
+       }
+
+       @Override
+       public void close() throws Exception {
+               latch.close();
+       }
+
+       @Override
+       public Long map(final Long value) throws InterruptedException {
+               latch.await();
+               return value + 1;
+       }
+}
diff --git 
a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/DataSetFineGrainedRecoveryTestProgram.java
 
b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/DataSetFineGrainedRecoveryTestProgram.java
new file mode 100644
index 0000000..749d232
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/DataSetFineGrainedRecoveryTestProgram.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.batch.tests;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+
+/**
+ * Program to test fine grained recovery.
+ */
+public class DataSetFineGrainedRecoveryTestProgram {
+
+       public static void main(String[] args) throws Exception {
+               final ParameterTool params = ParameterTool.fromArgs(args);
+               final String latchFilePath = 
params.getRequired("latchFilePath");
+               final String outputPath = params.getRequired("outputPath");
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);
+               env.setParallelism(4);
+
+               env.generateSequence(0, 1000)
+                       .map(new BlockingIncrementingMapFunction(latchFilePath))
+                       .writeAsText(outputPath, FileSystem.WriteMode.OVERWRITE)
+                       .setParallelism(1);
+
+               env.execute();
+       }
+}
diff --git 
a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatch.java
 
b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatch.java
new file mode 100644
index 0000000..dee2414
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/main/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatch.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.batch.tests.util;
+
+import com.sun.nio.file.SensitivityWatchEventModifier;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A synchronization aid that allows a single thread to wait on the creation 
of a specified file.
+ */
+@NotThreadSafe
+public class FileBasedOneShotLatch implements Closeable {
+
+       private final Path latchFile;
+
+       private final WatchService watchService;
+
+       private boolean released;
+
+       public FileBasedOneShotLatch(final Path latchFile) {
+               this.latchFile = checkNotNull(latchFile);
+
+               final Path parentDir = checkNotNull(latchFile.getParent(), 
"latchFile must have a parent");
+               this.watchService = initWatchService(parentDir);
+       }
+
+       private static WatchService initWatchService(final Path parentDir) {
+               final WatchService watchService = createWatchService(parentDir);
+               watchForLatchFile(watchService, parentDir);
+               return watchService;
+       }
+
+       private static WatchService createWatchService(final Path parentDir) {
+               try {
+                       return parentDir.getFileSystem().newWatchService();
+               } catch (IOException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       private static void watchForLatchFile(final WatchService watchService, 
final Path parentDir) {
+               try {
+                       parentDir.register(
+                               watchService,
+                               new 
WatchEvent.Kind[]{StandardWatchEventKinds.ENTRY_CREATE},
+                               SensitivityWatchEventModifier.HIGH);
+               } catch (IOException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       /**
+        * Waits until the latch file is created.
+        *
+        * <p>When this method returns, subsequent invocations will not block 
even after the latch file
+        * is deleted. Note that this method may not return if the latch file 
is deleted before this
+        * method returns.
+        *
+        * @throws InterruptedException if interrupted while waiting
+        */
+       public void await() throws InterruptedException {
+               if (isReleasedOrReleasable()) {
+                       return;
+               }
+
+               awaitLatchFile(watchService);
+       }
+
+       private void awaitLatchFile(final WatchService watchService) throws 
InterruptedException {
+               while (true) {
+                       WatchKey watchKey = watchService.take();
+                       if (isReleasedOrReleasable()) {
+                               break;
+                       }
+                       watchKey.reset();
+               }
+       }
+
+       private boolean isReleasedOrReleasable() {
+               if (released) {
+                       return true;
+               }
+
+               if (Files.exists(latchFile)) {
+                       releaseLatch();
+                       return true;
+               }
+
+               return false;
+       }
+
+       private void releaseLatch() {
+               released = true;
+       }
+
+       @Override
+       public void close() throws IOException {
+               watchService.close();
+       }
+}
diff --git 
a/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/test/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatchTest.java
 
b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/test/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatchTest.java
new file mode 100644
index 0000000..42d5cff
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-dataset-fine-grained-recovery-test/src/test/java/org/apache/flink/batch/tests/util/FileBasedOneShotLatchTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.batch.tests.util;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link FileBasedOneShotLatch}.
+ */
+public class FileBasedOneShotLatchTest {
+
+       @Rule
+       public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       private FileBasedOneShotLatch latch;
+
+       private File latchFile;
+
+       @Before
+       public void setUp() {
+               latchFile = new File(temporaryFolder.getRoot(), "latchFile");
+               latch = new FileBasedOneShotLatch(latchFile.toPath());
+       }
+
+       @Test
+       public void awaitReturnsWhenFileIsCreated() throws Exception {
+               final AtomicBoolean awaitCompleted = new AtomicBoolean();
+               final Thread thread = new Thread(() -> {
+                       try {
+                               latch.await();
+                               awaitCompleted.set(true);
+                       } catch (InterruptedException e) {
+                               Thread.currentThread().interrupt();
+                       }
+               });
+               thread.start();
+
+               latchFile.createNewFile();
+               thread.join();
+
+               assertTrue(awaitCompleted.get());
+       }
+
+       @Test
+       public void subsequentAwaitDoesNotBlock() throws Exception {
+               latchFile.createNewFile();
+               latch.await();
+               latch.await();
+       }
+
+       @Test
+       public void subsequentAwaitDoesNotBlockEvenIfLatchFileIsDeleted() 
throws Exception {
+               latchFile.createNewFile();
+               latch.await();
+
+               latchFile.delete();
+               latch.await();
+       }
+
+       @Test
+       public void doesNotBlockIfFileExistsPriorToCreatingLatch() throws 
Exception {
+               latchFile.createNewFile();
+
+               final FileBasedOneShotLatch latch = new 
FileBasedOneShotLatch(latchFile.toPath());
+               latch.await();
+       }
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 46bfde2..d676291 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -39,6 +39,7 @@ under the License.
                <module>flink-parent-child-classloading-test-program</module>
                
<module>flink-parent-child-classloading-test-lib-package</module>
                <module>flink-dataset-allround-test</module>
+               <module>flink-dataset-fine-grained-recovery-test</module>
                <module>flink-datastream-allround-test</module>
                <module>flink-stream-sql-test</module>
                <module>flink-bucketing-sink-test</module>
diff --git a/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh 
b/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
index b619e16..b547142 100755
--- a/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
+++ b/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
@@ -20,7 +20,7 @@
 source "$(dirname "$0")"/common.sh
 source "$(dirname "$0")"/common_ha.sh
 
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-fine-grained-recovery-test/target/DataSetFineGrainedRecoveryTestProgram.jar
 
 function ha_cleanup() {
   # kill the cluster and zookeeper
@@ -32,30 +32,26 @@ on_exit ha_cleanup
 function run_ha_test() {
     local PARALLELISM=$1
 
-    local JM_KILLS=3
+    local JM_KILLS=2
+    local TM_KILLS=2
 
-    CLEARED=0
-    mkdir -p ${TEST_DATA_DIR}/control
-    touch ${TEST_DATA_DIR}/control/test.txt
+    local LATCH_FILE_PATH=$TEST_DATA_DIR/latchFile
 
-    # start the cluster on HA mode
-    start_ha_cluster
+    CLEARED=0
 
+    setup_and_start_cluster ${PARALLELISM}
     echo "Running on HA mode: parallelism=${PARALLELISM}."
 
     # submit a job in detached mode and let it run
     local JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \
         $TEST_PROGRAM_JAR \
-        --loadFactor 4 \
+        --latchFilePath $LATCH_FILE_PATH \
         --outputPath $TEST_DATA_DIR/out/dataset_allround \
-        --source true \
         | grep "Job has been submitted with JobID" | sed 's/.* //g')
 
     wait_job_running ${JOB_ID}
 
-    # start the watchdog that keeps the number of JMs stable
-    start_ha_jm_watchdog 1 "StandaloneSessionClusterEntrypoint" start_jm_cmd 
"8081"
-
+    local c
     for (( c=0; c<${JM_KILLS}; c++ )); do
         # kill the JM and wait for watchdog to
         # create a new one which will take over
@@ -63,10 +59,43 @@ function run_ha_test() {
         wait_job_running ${JOB_ID}
     done
 
-    cancel_job ${JOB_ID}
+    for (( c=0; c<${TM_KILLS}; c++ )); do
+        kill_and_replace_random_task_manager
+    done
+
+    touch ${LATCH_FILE_PATH}
+
+    wait_job_terminal_state ${JOB_ID} "FINISHED"
+    check_result_hash "DataSet-FineGrainedRecovery-Test" 
$TEST_DATA_DIR/out/dataset_allround "ac3d26e1afce19aa657527f000acb88b"
+}
+
+function setup_and_start_cluster() {
+    local NUM_TASK_MANAGERS=$1
+
+    create_ha_config
+
+    set_config_key "jobmanager.execution.failover-strategy" "region"
+    set_config_key 
"jobmanager.scheduler.partition.force-release-on-consumption" "false"
+    set_config_key "taskmanager.numberOfTaskSlots" "1"
+
+    set_config_key "restart-strategy" "fixed-delay"
+    set_config_key "restart-strategy.fixed-delay.attempts" "2147483647"
+
+    set_config_key "heartbeat.interval" "2000"
+    set_config_key "heartbeat.timeout" "10000"
+
+    start_local_zk
+    start_ha_jm_watchdog 1 "StandaloneSessionClusterEntrypoint" start_jm_cmd 
"8081"
+    start_taskmanagers ${NUM_TASK_MANAGERS}
+}
+
+function kill_and_replace_random_task_manager() {
+    local NUM_TASK_MANAGERS=$(query_number_of_running_tms)
 
-    # do not verify checkpoints in the logs
-    verify_logs ${JM_KILLS} false
+    kill_random_taskmanager
+    wait_for_number_of_running_tms $(( ${NUM_TASK_MANAGERS} - 1 ))
+    start_taskmanagers 1
+    wait_for_number_of_running_tms ${NUM_TASK_MANAGERS}
 }
 
 run_ha_test 4

Reply via email to