This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 204963910186a18da17fc26d7c3d768a9b7bf4b3 Author: Andrey Zagrebin <[email protected]> AuthorDate: Tue Nov 24 13:18:19 2020 +0300 [FLINK-20118][file connector] Introduce TaskManager and JobManager failures in FileSourceTextLinesITCase This closes #14199 --- .../file/src/FileSourceTextLinesITCase.java | 299 +++++++++++++++++++-- 1 file changed, 276 insertions(+), 23 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java index 88b4ba4..ce68f93 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java @@ -18,18 +18,33 @@ package org.apache.flink.connector.file.src; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.file.src.reader.TextLineFormat; import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.highavailability.nonha.embedded.TestingEmbeddedHaServices; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamUtils; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.collect.ClientAndIterator; -import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.util.TestLogger; import org.apache.flink.util.function.FunctionWithException; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -42,9 +57,15 @@ import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.zip.GZIPOutputStream; import static org.hamcrest.Matchers.equalTo; @@ -61,20 +82,86 @@ public class FileSourceTextLinesITCase extends TestLogger { @ClassRule public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); - @ClassRule - public static final MiniClusterWithClientResource MINI_CLUSTER = new MiniClusterWithClientResource( - new MiniClusterResourceConfiguration.Builder() - .setNumberTaskManagers(1) - .setNumberSlotsPerTaskManager(PARALLELISM) - .build()); + private static TestingMiniCluster miniCluster; + + private static TestingEmbeddedHaServices highAvailabilityServices; + + private static CompletedCheckpointStore checkpointStore; + + @BeforeClass + public static void setupMiniCluster() throws Exception { + highAvailabilityServices = new HaServices(TestingUtils.defaultExecutor(), + () -> checkpointStore, + new StandaloneCheckpointIDCounter()); + + final Configuration configuration = createConfiguration(); + + miniCluster = new TestingMiniCluster( + new TestingMiniClusterConfiguration.Builder() + .setConfiguration(configuration) + .setNumTaskManagers(1) + .setNumSlotsPerTaskManager(PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .build(), + () -> highAvailabilityServices); + + miniCluster.start(); + } + + private static Configuration createConfiguration() throws IOException { + final Configuration configuration = new Configuration(); + final String checkPointDir = Path.fromLocalFile(TMP_FOLDER.newFolder()).toUri().toString(); + configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkPointDir); + return configuration; + } + + @Before + public void setup() { + checkpointStore = new RecoverableCompletedCheckpointStore(); + } + + @AfterClass + public static void shutdownMiniCluster() throws Exception { + if (miniCluster != null) { + miniCluster.close(); + } + if (highAvailabilityServices != null) { + highAvailabilityServices.closeAndCleanupAllData(); + highAvailabilityServices = null; + } + } // ------------------------------------------------------------------------ + // test cases + // ------------------------------------------------------------------------ /** * This test runs a job reading bounded input with a stream record format (text lines). */ @Test public void testBoundedTextFileSource() throws Exception { + testBoundedTextFileSource(FailoverType.NONE); + } + + /** + * This test runs a job reading bounded input with a stream record format (text lines) + * and restarts TaskManager. + */ + @Test + public void testBoundedTextFileSourceWithTaskManagerFailover() throws Exception { + testBoundedTextFileSource(FailoverType.TM); + } + + /** + * This test runs a job reading bounded input with a stream record format (text lines) + * and triggers JobManager failover. + */ + @Test + public void testBoundedTextFileSourceWithJobManagerFailover() throws Exception { + testBoundedTextFileSource(FailoverType.JM); + } + + private void testBoundedTextFileSource(FailoverType failoverType) throws Exception { final File testDir = TMP_FOLDER.newFolder(); // our main test data @@ -84,18 +171,32 @@ public class FileSourceTextLinesITCase extends TestLogger { writeHiddenJunkFiles(testDir); final FileSource<String> source = FileSource - .forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)) - .build(); + .forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)) + .build(); - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + final StreamExecutionEnvironment env = new TestStreamEnvironment(miniCluster, PARALLELISM); env.setParallelism(PARALLELISM); final DataStream<String> stream = env.fromSource( - source, - WatermarkStrategy.noWatermarks(), - "file-source"); + source, + WatermarkStrategy.noWatermarks(), + "file-source"); + + final DataStream<String> streamFailingInTheMiddleOfReading = + RecordCounterToFail.wrapWithFailureAfter(stream, LINES.length / 2); - final List<String> result = DataStreamUtils.collectBoundedStream(stream, "Bounded TextFiles Test"); + final ClientAndIterator<String> client = DataStreamUtils.collectWithClient( + streamFailingInTheMiddleOfReading, + "Bounded TextFiles Test"); + final JobID jobId = client.client.getJobID(); + + RecordCounterToFail.waitToFail(); + triggerFailover(failoverType, jobId, RecordCounterToFail::continueProcessing); + + final List<String> result = new ArrayList<>(); + while (client.iterator.hasNext()) { + result.add(client.iterator.next()); + } verifyResult(result); } @@ -106,23 +207,47 @@ public class FileSourceTextLinesITCase extends TestLogger { */ @Test public void testContinuousTextFileSource() throws Exception { + testContinuousTextFileSource(FailoverType.NONE); + } + + /** + * This test runs a job reading continuous input (files appearing over time) + * with a stream record format (text lines) and restarts TaskManager. + */ + @Test + public void testContinuousTextFileSourceWithTaskManagerFailover() throws Exception { + testContinuousTextFileSource(FailoverType.TM); + } + + /** + * This test runs a job reading continuous input (files appearing over time) + * with a stream record format (text lines) and triggers JobManager failover. + */ + @Test + public void testContinuousTextFileSourceWithJobManagerFailover() throws Exception { + testContinuousTextFileSource(FailoverType.JM); + } + + private void testContinuousTextFileSource(FailoverType type) throws Exception { final File testDir = TMP_FOLDER.newFolder(); final FileSource<String> source = FileSource - .forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)) - .monitorContinuously(Duration.ofMillis(5)) - .build(); + .forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)) + .monitorContinuously(Duration.ofMillis(5)) + .build(); - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + final StreamExecutionEnvironment env = new TestStreamEnvironment(miniCluster, PARALLELISM); env.setParallelism(PARALLELISM); + env.enableCheckpointing(10L); final DataStream<String> stream = env.fromSource( - source, - WatermarkStrategy.noWatermarks(), - "file-source"); + source, + WatermarkStrategy.noWatermarks(), + "file-source"); final ClientAndIterator<String> client = DataStreamUtils.collectWithClient(stream, "Continuous TextFiles Monitoring Test"); + final JobID jobId = client.client.getJobID(); // write one file, execute, and wait for its result // that way we know that the application was running and the source has @@ -138,9 +263,15 @@ public class FileSourceTextLinesITCase extends TestLogger { for (int i = 1; i < LINES_PER_FILE.length; i++) { Thread.sleep(10); writeFile(testDir, i); + final boolean failAfterHalfOfInput = i == LINES_PER_FILE.length / 2; + if (failAfterHalfOfInput) { + triggerFailover(type, jobId, () -> {}); + } } - final List<String> result2 = DataStreamUtils.collectRecordsFromUnboundedStream(client, numLinesAfter); + final List<String> result2 = DataStreamUtils.collectRecordsFromUnboundedStream( + client, + numLinesAfter); // shut down the job, now that we have all the results we expected. client.client.cancel().get(); @@ -150,6 +281,45 @@ public class FileSourceTextLinesITCase extends TestLogger { } // ------------------------------------------------------------------------ + // test utilities + // ------------------------------------------------------------------------ + + private enum FailoverType { + NONE, + TM, + JM + } + + private static void triggerFailover( + FailoverType type, + JobID jobId, + Runnable afterFailAction) throws Exception { + switch (type) { + case NONE: + afterFailAction.run(); + break; + case TM: + restartTaskManager(afterFailAction); + break; + case JM: + triggerJobManagerFailover(jobId, afterFailAction); + break; + } + } + + private static void triggerJobManagerFailover(JobID jobId, Runnable afterFailAction) throws Exception { + highAvailabilityServices.revokeJobMasterLeadership(jobId).get(); + afterFailAction.run(); + highAvailabilityServices.grantJobMasterLeadership(jobId).get(); + } + + private static void restartTaskManager(Runnable afterFailAction) throws Exception { + miniCluster.terminateTaskExecutor(0).get(); + afterFailAction.run(); + miniCluster.startTaskExecutor(); + } + + // ------------------------------------------------------------------------ // verification // ------------------------------------------------------------------------ @@ -308,4 +478,87 @@ public class FileSourceTextLinesITCase extends TestLogger { assertTrue(stagingFile.renameTo(file)); } + + // ------------------------------------------------------------------------ + // mini cluster failover utilities + // ------------------------------------------------------------------------ + + private static class HaServices extends TestingEmbeddedHaServices { + private final Supplier<CompletedCheckpointStore> completedCheckpointStoreFactory; + private final CheckpointIDCounter checkpointIDCounter; + + private HaServices( + Executor executor, + Supplier<CompletedCheckpointStore> completedCheckpointStoreFactory, + CheckpointIDCounter checkpointIDCounter) { + super(executor); + this.completedCheckpointStoreFactory = completedCheckpointStoreFactory; + this.checkpointIDCounter = checkpointIDCounter; + } + + @Override + public CheckpointRecoveryFactory getCheckpointRecoveryFactory() { + return new CheckpointRecoveryFactoryWithSettableStore( + completedCheckpointStoreFactory, + checkpointIDCounter); + } + } + + private static class CheckpointRecoveryFactoryWithSettableStore implements CheckpointRecoveryFactory { + private final Supplier<CompletedCheckpointStore> completedCheckpointStoreFactory; + private final CheckpointIDCounter checkpointIDCounter; + + private CheckpointRecoveryFactoryWithSettableStore( + Supplier<CompletedCheckpointStore> completedCheckpointStoreFactory, + CheckpointIDCounter checkpointIDCounter) { + this.completedCheckpointStoreFactory = completedCheckpointStoreFactory; + this.checkpointIDCounter = checkpointIDCounter; + } + + @Override + public CompletedCheckpointStore createCheckpointStore( + JobID jobId, + int maxNumberOfCheckpointsToRetain, + ClassLoader userClassLoader) { + return completedCheckpointStoreFactory.get(); + } + + @Override + public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) { + return checkpointIDCounter; + } + } + + private static class RecordCounterToFail { + + private static AtomicInteger records; + private static CompletableFuture<Void> fail; + private static CompletableFuture<Void> continueProcessing; + + private static <T> DataStream<T> wrapWithFailureAfter( + DataStream<T> stream, + int failAfter) { + + records = new AtomicInteger(); + fail = new CompletableFuture<>(); + continueProcessing = new CompletableFuture<>(); + return stream.map(record -> { + final boolean halfOfInputIsRead = records.incrementAndGet() > failAfter; + final boolean notFailedYet = !fail.isDone(); + if (notFailedYet && halfOfInputIsRead) { + fail.complete(null); + continueProcessing.get(); + } + return record; + }); + } + + private static void waitToFail() throws ExecutionException, InterruptedException { + fail.get(); + } + + private static void continueProcessing() { + continueProcessing.complete(null); + } + } }
