This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 204963910186a18da17fc26d7c3d768a9b7bf4b3
Author: Andrey Zagrebin <[email protected]>
AuthorDate: Tue Nov 24 13:18:19 2020 +0300

    [FLINK-20118][file connector] Introduce TaskManager and JobManager failures 
in FileSourceTextLinesITCase
    
    This closes #14199
---
 .../file/src/FileSourceTextLinesITCase.java        | 299 +++++++++++++++++++--
 1 file changed, 276 insertions(+), 23 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
index 88b4ba4..ce68f93 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
@@ -18,18 +18,33 @@
 
 package org.apache.flink.connector.file.src;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.file.src.reader.TextLineFormat;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.TestingEmbeddedHaServices;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.FunctionWithException;
 
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -42,9 +57,15 @@ import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 import java.util.zip.GZIPOutputStream;
 
 import static org.hamcrest.Matchers.equalTo;
@@ -61,20 +82,86 @@ public class FileSourceTextLinesITCase extends TestLogger {
        @ClassRule
        public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
 
-       @ClassRule
-       public static final MiniClusterWithClientResource MINI_CLUSTER = new 
MiniClusterWithClientResource(
-               new MiniClusterResourceConfiguration.Builder()
-                       .setNumberTaskManagers(1)
-                       .setNumberSlotsPerTaskManager(PARALLELISM)
-                       .build());
+       private static TestingMiniCluster miniCluster;
+
+       private static TestingEmbeddedHaServices highAvailabilityServices;
+
+       private static CompletedCheckpointStore checkpointStore;
+
+       @BeforeClass
+       public static void setupMiniCluster() throws Exception  {
+               highAvailabilityServices = new 
HaServices(TestingUtils.defaultExecutor(),
+                       () -> checkpointStore,
+                       new StandaloneCheckpointIDCounter());
+
+               final Configuration configuration = createConfiguration();
+
+               miniCluster = new TestingMiniCluster(
+                       new TestingMiniClusterConfiguration.Builder()
+                               .setConfiguration(configuration)
+                               .setNumTaskManagers(1)
+                               .setNumSlotsPerTaskManager(PARALLELISM)
+                               
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                               .build(),
+                       () -> highAvailabilityServices);
+
+               miniCluster.start();
+       }
+
+       private static Configuration createConfiguration() throws IOException {
+               final Configuration configuration = new Configuration();
+               final String checkPointDir = 
Path.fromLocalFile(TMP_FOLDER.newFolder()).toUri().toString();
+               configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkPointDir);
+               return configuration;
+       }
+
+       @Before
+       public void setup() {
+               checkpointStore = new RecoverableCompletedCheckpointStore();
+       }
+
+       @AfterClass
+       public static void shutdownMiniCluster() throws Exception {
+               if (miniCluster != null) {
+                       miniCluster.close();
+               }
+               if (highAvailabilityServices != null) {
+                       highAvailabilityServices.closeAndCleanupAllData();
+                       highAvailabilityServices = null;
+               }
+       }
 
        // 
------------------------------------------------------------------------
+       //  test cases
+       // 
------------------------------------------------------------------------
 
        /**
         * This test runs a job reading bounded input with a stream record 
format (text lines).
         */
        @Test
        public void testBoundedTextFileSource() throws Exception {
+               testBoundedTextFileSource(FailoverType.NONE);
+       }
+
+       /**
+        * This test runs a job reading bounded input with a stream record 
format (text lines)
+        * and restarts TaskManager.
+        */
+       @Test
+       public void testBoundedTextFileSourceWithTaskManagerFailover() throws 
Exception {
+               testBoundedTextFileSource(FailoverType.TM);
+       }
+
+       /**
+        * This test runs a job reading bounded input with a stream record 
format (text lines)
+        * and triggers JobManager failover.
+        */
+       @Test
+       public void testBoundedTextFileSourceWithJobManagerFailover() throws 
Exception {
+               testBoundedTextFileSource(FailoverType.JM);
+       }
+
+       private void testBoundedTextFileSource(FailoverType failoverType) 
throws Exception {
                final File testDir = TMP_FOLDER.newFolder();
 
                // our main test data
@@ -84,18 +171,32 @@ public class FileSourceTextLinesITCase extends TestLogger {
                writeHiddenJunkFiles(testDir);
 
                final FileSource<String> source = FileSource
-                               .forRecordStreamFormat(new TextLineFormat(), 
Path.fromLocalFile(testDir))
-                               .build();
+                       .forRecordStreamFormat(new TextLineFormat(), 
Path.fromLocalFile(testDir))
+                       .build();
 
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               final StreamExecutionEnvironment env = new 
TestStreamEnvironment(miniCluster, PARALLELISM);
                env.setParallelism(PARALLELISM);
 
                final DataStream<String> stream = env.fromSource(
-                               source,
-                               WatermarkStrategy.noWatermarks(),
-                               "file-source");
+                       source,
+                       WatermarkStrategy.noWatermarks(),
+                       "file-source");
+
+               final DataStream<String> streamFailingInTheMiddleOfReading =
+                       RecordCounterToFail.wrapWithFailureAfter(stream, 
LINES.length / 2);
 
-               final List<String> result = 
DataStreamUtils.collectBoundedStream(stream, "Bounded TextFiles Test");
+               final ClientAndIterator<String> client = 
DataStreamUtils.collectWithClient(
+                       streamFailingInTheMiddleOfReading,
+                       "Bounded TextFiles Test");
+               final JobID jobId = client.client.getJobID();
+
+               RecordCounterToFail.waitToFail();
+               triggerFailover(failoverType, jobId, 
RecordCounterToFail::continueProcessing);
+
+               final List<String> result = new ArrayList<>();
+               while (client.iterator.hasNext()) {
+                       result.add(client.iterator.next());
+               }
 
                verifyResult(result);
        }
@@ -106,23 +207,47 @@ public class FileSourceTextLinesITCase extends TestLogger 
{
         */
        @Test
        public void testContinuousTextFileSource() throws Exception {
+               testContinuousTextFileSource(FailoverType.NONE);
+       }
+
+       /**
+        * This test runs a job reading continuous input (files appearing over 
time)
+        * with a stream record format (text lines) and restarts TaskManager.
+        */
+       @Test
+       public void testContinuousTextFileSourceWithTaskManagerFailover() 
throws Exception {
+               testContinuousTextFileSource(FailoverType.TM);
+       }
+
+       /**
+        * This test runs a job reading continuous input (files appearing over 
time)
+        * with a stream record format (text lines) and triggers JobManager 
failover.
+        */
+       @Test
+       public void testContinuousTextFileSourceWithJobManagerFailover() throws 
Exception {
+               testContinuousTextFileSource(FailoverType.JM);
+       }
+
+       private void testContinuousTextFileSource(FailoverType type) throws 
Exception {
                final File testDir = TMP_FOLDER.newFolder();
 
                final FileSource<String> source = FileSource
-                               .forRecordStreamFormat(new TextLineFormat(), 
Path.fromLocalFile(testDir))
-                               .monitorContinuously(Duration.ofMillis(5))
-                               .build();
+                       .forRecordStreamFormat(new TextLineFormat(), 
Path.fromLocalFile(testDir))
+                       .monitorContinuously(Duration.ofMillis(5))
+                       .build();
 
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               final StreamExecutionEnvironment env = new 
TestStreamEnvironment(miniCluster, PARALLELISM);
                env.setParallelism(PARALLELISM);
+               env.enableCheckpointing(10L);
 
                final DataStream<String> stream = env.fromSource(
-                               source,
-                               WatermarkStrategy.noWatermarks(),
-                               "file-source");
+                       source,
+                       WatermarkStrategy.noWatermarks(),
+                       "file-source");
 
                final ClientAndIterator<String> client =
                                DataStreamUtils.collectWithClient(stream, 
"Continuous TextFiles Monitoring Test");
+               final JobID jobId = client.client.getJobID();
 
                // write one file, execute, and wait for its result
                // that way we know that the application was running and the 
source has
@@ -138,9 +263,15 @@ public class FileSourceTextLinesITCase extends TestLogger {
                for (int i = 1; i < LINES_PER_FILE.length; i++) {
                        Thread.sleep(10);
                        writeFile(testDir, i);
+                       final boolean failAfterHalfOfInput = i == 
LINES_PER_FILE.length / 2;
+                       if (failAfterHalfOfInput) {
+                               triggerFailover(type, jobId, () -> {});
+                       }
                }
 
-               final List<String> result2 = 
DataStreamUtils.collectRecordsFromUnboundedStream(client, numLinesAfter);
+               final List<String> result2 = 
DataStreamUtils.collectRecordsFromUnboundedStream(
+                       client,
+                       numLinesAfter);
 
                // shut down the job, now that we have all the results we 
expected.
                client.client.cancel().get();
@@ -150,6 +281,45 @@ public class FileSourceTextLinesITCase extends TestLogger {
        }
 
        // 
------------------------------------------------------------------------
+       //  test utilities
+       // 
------------------------------------------------------------------------
+
+       private enum FailoverType {
+               NONE,
+               TM,
+               JM
+       }
+
+       private static void triggerFailover(
+                       FailoverType type,
+                       JobID jobId,
+                       Runnable afterFailAction) throws Exception {
+               switch (type) {
+                       case NONE:
+                               afterFailAction.run();
+                               break;
+                       case TM:
+                               restartTaskManager(afterFailAction);
+                               break;
+                       case JM:
+                               triggerJobManagerFailover(jobId, 
afterFailAction);
+                               break;
+               }
+       }
+
+       private static void triggerJobManagerFailover(JobID jobId, Runnable 
afterFailAction) throws Exception {
+               highAvailabilityServices.revokeJobMasterLeadership(jobId).get();
+               afterFailAction.run();
+               highAvailabilityServices.grantJobMasterLeadership(jobId).get();
+       }
+
+       private static void restartTaskManager(Runnable afterFailAction) throws 
Exception {
+               miniCluster.terminateTaskExecutor(0).get();
+               afterFailAction.run();
+               miniCluster.startTaskExecutor();
+       }
+
+       // 
------------------------------------------------------------------------
        //  verification
        // 
------------------------------------------------------------------------
 
@@ -308,4 +478,87 @@ public class FileSourceTextLinesITCase extends TestLogger {
 
                assertTrue(stagingFile.renameTo(file));
        }
+
+       // 
------------------------------------------------------------------------
+       //  mini cluster failover utilities
+       // 
------------------------------------------------------------------------
+
+       private static class HaServices extends TestingEmbeddedHaServices {
+               private final Supplier<CompletedCheckpointStore> 
completedCheckpointStoreFactory;
+               private final CheckpointIDCounter checkpointIDCounter;
+
+               private HaServices(
+                               Executor executor,
+                               Supplier<CompletedCheckpointStore> 
completedCheckpointStoreFactory,
+                               CheckpointIDCounter checkpointIDCounter) {
+                       super(executor);
+                       this.completedCheckpointStoreFactory = 
completedCheckpointStoreFactory;
+                       this.checkpointIDCounter = checkpointIDCounter;
+               }
+
+               @Override
+               public CheckpointRecoveryFactory getCheckpointRecoveryFactory() 
{
+                       return new CheckpointRecoveryFactoryWithSettableStore(
+                               completedCheckpointStoreFactory,
+                               checkpointIDCounter);
+               }
+       }
+
+       private static class CheckpointRecoveryFactoryWithSettableStore 
implements CheckpointRecoveryFactory {
+               private final Supplier<CompletedCheckpointStore> 
completedCheckpointStoreFactory;
+               private final CheckpointIDCounter checkpointIDCounter;
+
+               private CheckpointRecoveryFactoryWithSettableStore(
+                               Supplier<CompletedCheckpointStore> 
completedCheckpointStoreFactory,
+                               CheckpointIDCounter checkpointIDCounter) {
+                       this.completedCheckpointStoreFactory = 
completedCheckpointStoreFactory;
+                       this.checkpointIDCounter = checkpointIDCounter;
+               }
+
+               @Override
+               public CompletedCheckpointStore createCheckpointStore(
+                               JobID jobId,
+                               int maxNumberOfCheckpointsToRetain,
+                               ClassLoader userClassLoader) {
+                       return completedCheckpointStoreFactory.get();
+               }
+
+               @Override
+               public CheckpointIDCounter createCheckpointIDCounter(JobID 
jobId) {
+                       return checkpointIDCounter;
+               }
+       }
+
+       private static class RecordCounterToFail {
+
+               private static AtomicInteger records;
+               private static CompletableFuture<Void> fail;
+               private static CompletableFuture<Void> continueProcessing;
+
+               private static <T> DataStream<T> wrapWithFailureAfter(
+                               DataStream<T> stream,
+                               int failAfter) {
+
+                       records = new AtomicInteger();
+                       fail = new CompletableFuture<>();
+                       continueProcessing = new CompletableFuture<>();
+                       return stream.map(record -> {
+                               final boolean halfOfInputIsRead = 
records.incrementAndGet() > failAfter;
+                               final boolean notFailedYet = !fail.isDone();
+                               if (notFailedYet && halfOfInputIsRead) {
+                                       fail.complete(null);
+                                       continueProcessing.get();
+                               }
+                               return record;
+                       });
+               }
+
+               private static void waitToFail() throws ExecutionException, 
InterruptedException {
+                       fail.get();
+               }
+
+               private static void continueProcessing() {
+                       continueProcessing.complete(null);
+               }
+       }
 }

Reply via email to