This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit c54f5a766505ea64e0c590ab68e0f121ef3ebc73 Author: Yun Gao <[email protected]> AuthorDate: Fri Nov 27 19:33:15 2020 +0800 [FLINK-20337] Add ITCase for migrating from StreamingFileSink to FileSink --- .../flink-connector-migration-test/pom.xml | 8 + .../file/sink/writer/FileSinkMigrationITCase.java | 287 +++++++++++++++++++++ 2 files changed, 295 insertions(+) diff --git a/flink-connectors/flink-connector-migration-test/pom.xml b/flink-connectors/flink-connector-migration-test/pom.xml index 9a16ada..55b0f41 100644 --- a/flink-connectors/flink-connector-migration-test/pom.xml +++ b/flink-connectors/flink-connector-migration-test/pom.xml @@ -54,5 +54,13 @@ under the License. <type>test-jar</type> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-files</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/flink-connectors/flink-connector-migration-test/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java b/flink-connectors/flink-connector-migration-test/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java new file mode 100644 index 0000000..2f6ed68 --- /dev/null +++ b/flink-connectors/flink-connector-migration-test/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.writer; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +/** + * Tests migrating from {@link StreamingFileSink} to {@link FileSink}. It trigger a savepoint + * for the {@link StreamingFileSink} job and restore the {@link FileSink} job from the savepoint + * taken. + */ +public class FileSinkMigrationITCase extends TestLogger { + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static final String SOURCE_UID = "source"; + + private static final String SINK_UID = "sink"; + + private static final int NUM_SOURCES = 4; + + private static final int NUM_SINKS = 3; + + private static final int NUM_RECORDS = 10000; + + private static final int NUM_BUCKETS = 4; + + private static final Map<String, CountDownLatch> SAVEPOINT_LATCH_MAP = new ConcurrentHashMap<>(); + + private static final Map<String, CountDownLatch> FINAL_CHECKPOINT_LATCH_MAP = new ConcurrentHashMap<>(); + + private String latchId; + + @Before + public void setup() { + this.latchId = UUID.randomUUID().toString(); + SAVEPOINT_LATCH_MAP.put(latchId, new CountDownLatch(NUM_SOURCES)); + + // We wait for two successful checkpoints in sources before shutting down. This ensures that + // the sink can commit its data. + // We need to keep a "static" latch here because all sources need to be kept running + // while we're waiting for the required number of checkpoints. Otherwise, we would lock up + // because we can only do checkpoints while all operators are running. + FINAL_CHECKPOINT_LATCH_MAP.put(latchId, new CountDownLatch(NUM_SOURCES * 2)); + } + + @After + public void teardown() { + SAVEPOINT_LATCH_MAP.remove(latchId); + FINAL_CHECKPOINT_LATCH_MAP.remove(latchId); + } + + @Test + public void testMigration() throws Exception { + String outputPath = TEMPORARY_FOLDER.newFolder().getAbsolutePath(); + String savepointBasePath = TEMPORARY_FOLDER.newFolder().getAbsolutePath(); + + final Configuration config = new Configuration(); + config.setString(RestOptions.BIND_PORT, "18081-19000"); + final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() + .setNumTaskManagers(1) + .setNumSlotsPerTaskManager(4) + .setConfiguration(config) + .build(); + + JobGraph streamingFileSinkJobGraph = createStreamingFileSinkJobGraph(outputPath); + String savepointPath = executeAndTakeSavepoint(cfg, streamingFileSinkJobGraph, savepointBasePath); + + JobGraph fileSinkJobGraph = createFileSinkJobGraph(outputPath); + loadSavepointAndExecute(cfg, fileSinkJobGraph, savepointPath); + + IntegerFileSinkTestDataUtils.checkIntegerSequenceSinkOutput(outputPath, NUM_RECORDS, NUM_BUCKETS, NUM_SOURCES); + } + + private JobGraph createStreamingFileSinkJobGraph(String outputPath) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE); + + StreamingFileSink<Integer> sink = StreamingFileSink + .forRowFormat(new Path(outputPath), new IntegerFileSinkTestDataUtils.IntEncoder()) + .withBucketAssigner(new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(NUM_BUCKETS)) + .withRollingPolicy(OnCheckpointRollingPolicy.build()) + .build(); + + env.addSource(new StatefulSource(true, latchId)) + .uid(SOURCE_UID) + .setParallelism(NUM_SOURCES) + .addSink(sink) + .setParallelism(NUM_SINKS) + .uid(SINK_UID); + return env.getStreamGraph().getJobGraph(); + } + + private JobGraph createFileSinkJobGraph(String outputPath) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE); + + FileSink<Integer> sink = FileSink + .forRowFormat(new Path(outputPath), new IntegerFileSinkTestDataUtils.IntEncoder()) + .withBucketAssigner(new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(NUM_BUCKETS)) + .withRollingPolicy(OnCheckpointRollingPolicy.build()) + .build(); + + env.addSource(new StatefulSource(false, latchId)) + .uid(SOURCE_UID) + .setParallelism(NUM_SOURCES) + .sinkTo(sink) + .setParallelism(NUM_SINKS) + .uid(SINK_UID); + return env.getStreamGraph().getJobGraph(); + } + + private String executeAndTakeSavepoint( + MiniClusterConfiguration cfg, + JobGraph jobGraph, + String savepointBasePath) throws Exception { + try (MiniCluster miniCluster = new MiniCluster(cfg)) { + miniCluster.start(); + CompletableFuture<JobSubmissionResult> jobSubmissionResultFuture = miniCluster.submitJob(jobGraph); + JobID jobId = jobSubmissionResultFuture.get().getJobID(); + + // wait till we can taking savepoint + CountDownLatch latch = SAVEPOINT_LATCH_MAP.get(latchId); + latch.await(); + + CompletableFuture<String> savepointResultFuture = miniCluster.triggerSavepoint(jobId, savepointBasePath, true); + return savepointResultFuture.get(); + } + } + + private void loadSavepointAndExecute( + MiniClusterConfiguration cfg, + JobGraph jobGraph, + String savepointPath + ) throws Exception { + jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, false)); + + try (MiniCluster miniCluster = new MiniCluster(cfg)) { + miniCluster.start(); + miniCluster.executeJobBlocking(jobGraph); + } + } + + private static class StatefulSource extends RichParallelSourceFunction<Integer> implements + CheckpointedFunction, CheckpointListener { + + private final boolean takingSavepointMode; + + private final String latchId; + + private ListState<Integer> nextValueState; + + private int nextValue; + + private volatile boolean snapshottedAfterAllRecordsOutput; + + private volatile boolean isWaitingCheckpointComplete; + + private volatile boolean isCanceled; + + public StatefulSource(boolean takingSavepointMode, String latchId) { + this.takingSavepointMode = takingSavepointMode; + this.latchId = latchId; + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + nextValueState = context.getOperatorStateStore().getListState( + new ListStateDescriptor<>("nextValue", Integer.class)); + + if (nextValueState.get() != null && nextValueState.get().iterator().hasNext()) { + nextValue = nextValueState.get().iterator().next(); + } + } + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + if (takingSavepointMode) { + sendRecordsUntil(NUM_RECORDS / 3, 0, ctx); + + CountDownLatch latch = SAVEPOINT_LATCH_MAP.get(latchId); + latch.countDown(); + + sendRecordsUntil(NUM_RECORDS / 2, 100, ctx); + + while (true) { + Thread.sleep(5000); + } + } else { + sendRecordsUntil(NUM_RECORDS, 0, ctx); + + // Wait the last checkpoint to commit all the pending records. + isWaitingCheckpointComplete = true; + CountDownLatch latch = FINAL_CHECKPOINT_LATCH_MAP.get(latchId); + latch.await(); + } + } + + private void sendRecordsUntil(int targetNumber, int sleepInMillis, SourceContext<Integer> ctx) + throws InterruptedException { + while (!isCanceled && nextValue < targetNumber) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(nextValue++); + } + + if (sleepInMillis > 0) { + Thread.sleep(sleepInMillis); + } + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + nextValueState.update(Collections.singletonList(nextValue)); + + if (isWaitingCheckpointComplete) { + snapshottedAfterAllRecordsOutput = true; + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + if (isWaitingCheckpointComplete && snapshottedAfterAllRecordsOutput) { + CountDownLatch latch = FINAL_CHECKPOINT_LATCH_MAP.get(latchId); + latch.countDown(); + } + } + + @Override + public void cancel() { + isCanceled = true; + } + + } +}
