This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit aec2d38710a67d90bd819bfdce66b5a5a646a882 Author: Gen Luo <luogen...@gmail.com> AuthorDate: Wed Feb 16 18:38:48 2022 +0800 [FLINK-25583][connectors/filesystem] Add IT cases for compaction in FileSink. This closes #18680. --- .../file/sink/BatchCompactingFileSinkITCase.java | 69 ++++ .../file/sink/FileSinkCompactionSwitchITCase.java | 391 +++++++++++++++++++++ .../flink/connector/file/sink/FileSinkITBase.java | 4 +- .../sink/StreamingCompactingFileSinkITCase.java | 69 ++++ 4 files changed, 532 insertions(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchCompactingFileSinkITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchCompactingFileSinkITCase.java new file mode 100644 index 0000000..5167e97 --- /dev/null +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchCompactingFileSinkITCase.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink; + +import org.apache.flink.connector.file.sink.compactor.DecoderBasedReader; +import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy; +import org.apache.flink.connector.file.sink.compactor.FileCompactor; +import org.apache.flink.connector.file.sink.compactor.RecordWiseFileCompactor; +import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils; +import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.IntDecoder; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; + +import org.junit.Rule; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** Tests the compaction of the {@link FileSink} in BATCH mode. */ +@RunWith(Parameterized.class) +public class BatchCompactingFileSinkITCase extends BatchExecutionFileSinkITCase { + + private static final int PARALLELISM = 4; + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .withHaLeadershipControl() + .build()); + + @Override + protected FileSink<Integer> createFileSink(String path) { + return FileSink.forRowFormat(new Path(path), new IntegerFileSinkTestDataUtils.IntEncoder()) + .withBucketAssigner( + new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(NUM_BUCKETS)) + .withRollingPolicy(new PartSizeAndCheckpointRollingPolicy(1024)) + .enableCompact(createFileCompactStrategy(), createFileCompactor()) + .build(); + } + + private static FileCompactor createFileCompactor() { + return new RecordWiseFileCompactor<>(new DecoderBasedReader.Factory<>(IntDecoder::new)); + } + + private static FileCompactStrategy createFileCompactStrategy() { + return FileCompactStrategy.Builder.newBuilder().setSizeThreshold(10000).build(); + } +} diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java new file mode 100644 index 0000000..4a7c0f0 --- /dev/null +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.connector.file.sink.FileSink.DefaultRowFormatBuilder; +import org.apache.flink.connector.file.sink.compactor.DecoderBasedReader; +import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy; +import org.apache.flink.connector.file.sink.compactor.FileCompactor; +import org.apache.flink.connector.file.sink.compactor.RecordWiseFileCompactor; +import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.IntDecoder; +import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.IntEncoder; +import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.ModuloBucketAssigner; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.testutils.junit.SharedObjects; +import org.apache.flink.testutils.junit.SharedReference; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** Tests of switching on or off compaction for the {@link FileSink}. */ +@RunWith(Parameterized.class) +public class FileSinkCompactionSwitchITCase { + + private static final int PARALLELISM = 4; + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .withHaLeadershipControl() + .build()); + + protected static final int NUM_SOURCES = 4; + + protected static final int NUM_SINKS = 3; + + protected static final int NUM_RECORDS = 10000; + + protected static final int NUM_BUCKETS = 4; + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule public final SharedObjects sharedObjects = SharedObjects.create(); + + private static final Map<String, CountDownLatch> LATCH_MAP = new ConcurrentHashMap<>(); + + private String latchId; + + @Parameterized.Parameter public boolean isOnToOff; + + @Parameterized.Parameters(name = "isOnToOff = {0}") + public static Collection<Object[]> params() { + return Arrays.asList(new Object[] {false}, new Object[] {true}); + } + + @Before + public void setup() { + this.latchId = UUID.randomUUID().toString(); + // Wait for 3 checkpoints to ensure that the coordinator and all compactors have state + LATCH_MAP.put(latchId, new CountDownLatch(NUM_SOURCES * 3)); + } + + @After + public void teardown() { + LATCH_MAP.remove(latchId); + } + + @Test + public void testSwitchingCompaction() throws Exception { + String path = TEMPORARY_FOLDER.newFolder().getAbsolutePath(); + + SharedReference<ConcurrentHashMap<Integer, Integer>> sendCountMap = + sharedObjects.add(new ConcurrentHashMap<>()); + JobGraph jobGraph = createJobGraph(path, isOnToOff, false, sendCountMap); + JobGraph restoringJobGraph = createJobGraph(path, !isOnToOff, true, sendCountMap); + + final Configuration config = new Configuration(); + config.setString(RestOptions.BIND_PORT, "18081-19000"); + final MiniClusterConfiguration cfg = + new MiniClusterConfiguration.Builder() + .setNumTaskManagers(1) + .setNumSlotsPerTaskManager(4) + .setConfiguration(config) + .build(); + + try (MiniCluster miniCluster = new MiniCluster(cfg)) { + miniCluster.start(); + miniCluster.submitJob(jobGraph); + + LATCH_MAP.get(latchId).await(); + + String savepointPath = + miniCluster + .triggerSavepoint( + jobGraph.getJobID(), + TEMPORARY_FOLDER.newFolder().getAbsolutePath(), + true, + SavepointFormatType.CANONICAL) + .get(); + + // We wait for two successful checkpoints in sources before shutting down. This ensures + // that the sink can commit its data. + LATCH_MAP.put(latchId, new CountDownLatch(NUM_SOURCES * 2)); + + restoringJobGraph.setSavepointRestoreSettings( + SavepointRestoreSettings.forPath(savepointPath, false)); + miniCluster.executeJobBlocking(restoringJobGraph); + } + + checkIntegerSequenceSinkOutput(path, sendCountMap.get(), NUM_BUCKETS, NUM_SOURCES); + } + + private JobGraph createJobGraph( + String path, + boolean compactionEnabled, + boolean isFinite, + SharedReference<ConcurrentHashMap<Integer, Integer>> sendCountMap) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Configuration config = new Configuration(); + config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING); + env.configure(config, getClass().getClassLoader()); + + env.enableCheckpointing(100, CheckpointingMode.EXACTLY_ONCE); + env.setRestartStrategy(RestartStrategies.noRestart()); + + env.addSource(new CountingTestSource(latchId, NUM_RECORDS, isFinite, sendCountMap)) + .setParallelism(NUM_SOURCES) + .sinkTo(createFileSink(path, compactionEnabled)) + .uid("sink") + .setParallelism(NUM_SINKS); + + StreamGraph streamGraph = env.getStreamGraph(); + return streamGraph.getJobGraph(); + } + + private FileSink<Integer> createFileSink(String path, boolean compactionEnabled) { + DefaultRowFormatBuilder<Integer> sinkBuilder = + FileSink.forRowFormat(new Path(path), new IntEncoder()) + .withBucketAssigner(new ModuloBucketAssigner(NUM_BUCKETS)) + .withRollingPolicy( + new FileSinkITBase.PartSizeAndCheckpointRollingPolicy(1024)); + + if (compactionEnabled) { + sinkBuilder = + sinkBuilder.enableCompact(createFileCompactStrategy(), createFileCompactor()); + } + + return sinkBuilder.build(); + } + + private static FileCompactor createFileCompactor() { + return new RecordWiseFileCompactor<>(new DecoderBasedReader.Factory<>(IntDecoder::new)); + } + + private static FileCompactStrategy createFileCompactStrategy() { + return FileCompactStrategy.Builder.newBuilder().enableCompactionOnCheckpoint(2).build(); + } + + private static void checkIntegerSequenceSinkOutput( + String path, Map<Integer, Integer> countMap, int numBuckets, int numSources) + throws Exception { + assertEquals(numSources, countMap.size()); + + File dir = new File(path); + String[] subDirNames = dir.list(); + assertNotNull(subDirNames); + + Arrays.sort(subDirNames, Comparator.comparingInt(Integer::parseInt)); + assertEquals(numBuckets, subDirNames.length); + for (int i = 0; i < numBuckets; ++i) { + assertEquals(Integer.toString(i), subDirNames[i]); + + // now check its content + File bucketDir = new File(path, subDirNames[i]); + assertTrue( + bucketDir.getAbsolutePath() + " Should be a existing directory", + bucketDir.isDirectory()); + + Map<Integer, Integer> counts = new HashMap<>(); + File[] files = bucketDir.listFiles(f -> !f.getName().startsWith(".")); + assertNotNull(files); + + for (File file : files) { + assertTrue(file.isFile()); + + try (DataInputStream dataInputStream = + new DataInputStream(new FileInputStream(file))) { + while (true) { + int value = dataInputStream.readInt(); + counts.compute(value, (k, v) -> v == null ? 1 : v + 1); + } + } catch (EOFException e) { + // End the reading + } + } + + int bucketId = i; + int expectedCount = + countMap.values().stream() + .map( + numRecords -> + numRecords / numBuckets + + (bucketId < numRecords % numBuckets ? 1 : 0)) + .mapToInt(num -> num) + .max() + .getAsInt(); + assertEquals(expectedCount, counts.size()); + + List<Integer> countList = new ArrayList<>(countMap.values()); + Collections.sort(countList); + for (int j = 0; j < countList.size(); j++) { + int rangeFrom = j == 0 ? 0 : countList.get(j - 1); + rangeFrom = + bucketId + + (rangeFrom % numBuckets == 0 + ? rangeFrom + : (rangeFrom + numBuckets - rangeFrom % numBuckets)); + int rangeTo = countList.get(j); + for (int k = rangeFrom; k < rangeTo; k += numBuckets) { + assertEquals( + "The record " + + k + + " should occur " + + (numBuckets - j) + + " times, " + + " but only occurs " + + counts.getOrDefault(k, 0) + + "time", + numBuckets - j, + counts.getOrDefault(k, 0).intValue()); + } + } + } + } + + private static class CountingTestSource extends RichParallelSourceFunction<Integer> + implements CheckpointListener, CheckpointedFunction { + + private final String latchId; + + private final int numberOfRecords; + + private final boolean isFinite; + + private final SharedReference<ConcurrentHashMap<Integer, Integer>> sendCountMap; + + private ListState<Integer> nextValueState; + + private int nextValue; + + private volatile boolean isCanceled; + + private volatile boolean snapshottedAfterAllRecordsOutput; + + private volatile boolean isWaitingCheckpointComplete; + + public CountingTestSource( + String latchId, + int numberOfRecords, + boolean isFinite, + SharedReference<ConcurrentHashMap<Integer, Integer>> sendCountMap) { + this.latchId = latchId; + this.numberOfRecords = numberOfRecords; + this.isFinite = isFinite; + this.sendCountMap = sendCountMap; + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + nextValueState = + context.getOperatorStateStore() + .getListState(new ListStateDescriptor<>("nextValue", Integer.class)); + + if (nextValueState.get() != null && nextValueState.get().iterator().hasNext()) { + nextValue = nextValueState.get().iterator().next(); + } + } + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + // If we are not going to trigger failover or we have already triggered failover, + // run until finished. + sendRecordsUntil(isFinite ? (nextValue + numberOfRecords) : Integer.MAX_VALUE, ctx); + + // Wait the last checkpoint to commit all the pending records. + isWaitingCheckpointComplete = true; + CountDownLatch latch = LATCH_MAP.get(latchId); + latch.await(); + } + + private void sendRecordsUntil(int targetNumber, SourceContext<Integer> ctx) { + while (!isCanceled && nextValue < targetNumber) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(nextValue++); + } + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + nextValueState.update(Collections.singletonList(nextValue)); + sendCountMap.consumeSync( + m -> m.put(getRuntimeContext().getIndexOfThisSubtask(), nextValue)); + + if (isWaitingCheckpointComplete) { + snapshottedAfterAllRecordsOutput = true; + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + if (!isFinite || (isWaitingCheckpointComplete && snapshottedAfterAllRecordsOutput)) { + CountDownLatch latch = LATCH_MAP.get(latchId); + latch.countDown(); + } + } + + @Override + public void cancel() { + isCanceled = true; + } + } +} diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java index 6c424ec..50f8107 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo; +import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy; import org.apache.flink.util.TestLogger; @@ -94,7 +95,8 @@ public abstract class FileSinkITBase extends TestLogger { .build(); } - private static class PartSizeAndCheckpointRollingPolicy + /** The testing {@link RollingPolicy} based on maximum file size. */ + protected static class PartSizeAndCheckpointRollingPolicy extends CheckpointRollingPolicy<Integer, String> { private final long maxPartSize; diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingCompactingFileSinkITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingCompactingFileSinkITCase.java new file mode 100644 index 0000000..227d49a --- /dev/null +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingCompactingFileSinkITCase.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink; + +import org.apache.flink.connector.file.sink.compactor.DecoderBasedReader; +import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy; +import org.apache.flink.connector.file.sink.compactor.FileCompactor; +import org.apache.flink.connector.file.sink.compactor.RecordWiseFileCompactor; +import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils; +import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.IntDecoder; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; + +import org.junit.Rule; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** Tests the compaction of the {@link FileSink} in STREAMING mode. */ +@RunWith(Parameterized.class) +public class StreamingCompactingFileSinkITCase extends StreamingExecutionFileSinkITCase { + + private static final int PARALLELISM = 4; + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .withHaLeadershipControl() + .build()); + + @Override + protected FileSink<Integer> createFileSink(String path) { + return FileSink.forRowFormat(new Path(path), new IntegerFileSinkTestDataUtils.IntEncoder()) + .withBucketAssigner( + new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(NUM_BUCKETS)) + .withRollingPolicy(new PartSizeAndCheckpointRollingPolicy(1024)) + .enableCompact(createFileCompactStrategy(), createFileCompactor()) + .build(); + } + + private static FileCompactor createFileCompactor() { + return new RecordWiseFileCompactor<>(new DecoderBasedReader.Factory<>(IntDecoder::new)); + } + + private static FileCompactStrategy createFileCompactStrategy() { + return FileCompactStrategy.Builder.newBuilder().setSizeThreshold(10000).build(); + } +}