Repository: flink Updated Branches: refs/heads/release-1.6 5d8431474 -> 702f77355
http://git-wip-us.apache.org/repos/asf/flink/blob/f998f0f9/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java new file mode 100644 index 0000000..61e1433 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies.DefaultRollingPolicy; +import org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies.OnCheckpointRollingPolicy; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; + +/** + * Tests for different {@link RollingPolicy rolling policies}. + */ +public class RollingPolicyTest { + + @ClassRule + public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); + + @Test + public void testDefaultRollingPolicy() throws Exception { + final File outDir = TEMP_FOLDER.newFolder(); + + final RollingPolicy<String> rollingPolicy = DefaultRollingPolicy + .create() + .withMaxPartSize(10L) + .withInactivityInterval(4L) + .withRolloverInterval(11L) + .build(); + + try ( + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createCustomRescalingTestSink( + outDir, + 1, + 0, + 1L, + new TestUtils.TupleToStringBucketer(), + new SimpleStringEncoder<>(), + rollingPolicy, + new DefaultBucketFactory<>()) + ) { + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(0L); + + testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L)); + TestUtils.checkLocalFs(outDir, 1, 0); + + // roll due to size + testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L)); + TestUtils.checkLocalFs(outDir, 1, 0); + + testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 3L)); + TestUtils.checkLocalFs(outDir, 2, 0); + + // roll due to inactivity + testHarness.setProcessingTime(7L); + + testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 4), 4L)); + TestUtils.checkLocalFs(outDir, 3, 0); + + // roll due to rollover interval + testHarness.setProcessingTime(20L); + + testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 5), 5L)); + TestUtils.checkLocalFs(outDir, 4, 0); + + // we take a checkpoint but we should not roll. + testHarness.snapshot(1L, 1L); + + TestUtils.checkLocalFs(outDir, 4, 0); + + // acknowledge the checkpoint, so publish the 3 closed files, but not the open one. + testHarness.notifyOfCompletedCheckpoint(1L); + TestUtils.checkLocalFs(outDir, 1, 3); + } + } + + @Test + public void testRollOnCheckpointPolicy() throws Exception { + final File outDir = TEMP_FOLDER.newFolder(); + + final RollingPolicy<String> rollingPolicy = new OnCheckpointRollingPolicy<>(); + + try ( + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createCustomRescalingTestSink( + outDir, + 1, + 0, + 10L, + new TestUtils.TupleToStringBucketer(), + new SimpleStringEncoder<>(), + rollingPolicy, + new DefaultBucketFactory<>()) + ) { + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(0L); + + testHarness.processElement(new StreamRecord<>(Tuple2.of("test2", 1), 1L)); + + testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L)); + testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L)); + TestUtils.checkLocalFs(outDir, 2, 0); + + testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 3L)); + TestUtils.checkLocalFs(outDir, 2, 0); + + // we take a checkpoint so we roll. + testHarness.snapshot(1L, 1L); + + // this will create a new part file + testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 4), 4L)); + TestUtils.checkLocalFs(outDir, 3, 0); + + // and open and fill .part-0-2.inprogress + testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 5), 5L)); + testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 6), 6L)); + TestUtils.checkLocalFs(outDir, 3, 0); // nothing committed yet + + // we take a checkpoint so we roll. + testHarness.snapshot(2L, 2L); + + testHarness.processElement(new StreamRecord<>(Tuple2.of("test2", 7), 7L)); + TestUtils.checkLocalFs(outDir, 4, 0); + + // we acknowledge the last checkpoint so we should publish all but the latest in-progress file + testHarness.notifyOfCompletedCheckpoint(2L); + TestUtils.checkLocalFs(outDir, 1, 3); + } + } + + @Test + public void testCustomRollingPolicy() throws Exception { + final File outDir = TEMP_FOLDER.newFolder(); + + final RollingPolicy<String> rollingPolicy = new RollingPolicy<String>() { + + private static final long serialVersionUID = 1L; + + @Override + public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) { + return true; + } + + @Override + public boolean shouldRollOnEvent(PartFileInfo<String> partFileState) throws IOException { + // this means that 2 elements will close the part file. + return partFileState.getSize() > 12L; + } + + @Override + public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) { + return false; + } + }; + + try ( + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createCustomRescalingTestSink( + outDir, + 1, + 0, + 10L, + new TestUtils.TupleToStringBucketer(), + new SimpleStringEncoder<>(), + rollingPolicy, + new DefaultBucketFactory<>()) + ) { + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(0L); + + testHarness.processElement(new StreamRecord<>(Tuple2.of("test2", 1), 1L)); + + // the following 2 elements will close a part file ... + testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L)); + testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L)); + + // ... and this one will open a new ... + testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 2L)); + TestUtils.checkLocalFs(outDir, 3, 0); + + // ... and all open part files should close here. + testHarness.snapshot(1L, 1L); + + // this will create and fill out a new part file + testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 4), 4L)); + testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 5), 5L)); + TestUtils.checkLocalFs(outDir, 4, 0); + + // we take a checkpoint so we roll. + testHarness.snapshot(2L, 2L); + + // we acknowledge the first checkpoint so we should publish all but the latest in-progress file + testHarness.notifyOfCompletedCheckpoint(1L); + TestUtils.checkLocalFs(outDir, 1, 3); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f998f0f9/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java new file mode 100644 index 0000000..184e23e --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer; +import org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies.DefaultRollingPolicy; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import org.apache.commons.io.FileUtils; +import org.junit.Assert; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Utilities for the {@link StreamingFileSink} tests. + */ +public class TestUtils { + + static OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> createRescalingTestSink( + File outDir, + int totalParallelism, + int taskIdx, + long inactivityInterval, + long partMaxSize) throws Exception { + + final RollingPolicy<String> rollingPolicy = + DefaultRollingPolicy + .create() + .withMaxPartSize(partMaxSize) + .withRolloverInterval(inactivityInterval) + .withInactivityInterval(inactivityInterval) + .build(); + + final Bucketer<Tuple2<String, Integer>, String> bucketer = new TupleToStringBucketer(); + + final Encoder<Tuple2<String, Integer>> encoder = (element, stream) -> { + stream.write((element.f0 + '@' + element.f1).getBytes(StandardCharsets.UTF_8)); + stream.write('\n'); + }; + + return createCustomRescalingTestSink( + outDir, + totalParallelism, + taskIdx, + 10L, + bucketer, + encoder, + rollingPolicy, + new DefaultBucketFactory<>()); + } + + static OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> createCustomRescalingTestSink( + final File outDir, + final int totalParallelism, + final int taskIdx, + final long bucketCheckInterval, + final Bucketer<Tuple2<String, Integer>, String> bucketer, + final Encoder<Tuple2<String, Integer>> writer, + final RollingPolicy<String> rollingPolicy, + final BucketFactory<Tuple2<String, Integer>, String> bucketFactory) throws Exception { + + StreamingFileSink<Tuple2<String, Integer>> sink = StreamingFileSink + .forRowFormat(new Path(outDir.toURI()), writer) + .withBucketer(bucketer) + .withRollingPolicy(rollingPolicy) + .withBucketCheckInterval(bucketCheckInterval) + .withBucketFactory(bucketFactory) + .build(); + + return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), 10, totalParallelism, taskIdx); + } + + static OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> createTestSinkWithBulkEncoder( + final File outDir, + final int totalParallelism, + final int taskIdx, + final long bucketCheckInterval, + final Bucketer<Tuple2<String, Integer>, String> bucketer, + final BulkWriter.Factory<Tuple2<String, Integer>> writer, + final BucketFactory<Tuple2<String, Integer>, String> bucketFactory) throws Exception { + + StreamingFileSink<Tuple2<String, Integer>> sink = StreamingFileSink + .forBulkFormat(new Path(outDir.toURI()), writer) + .withBucketer(bucketer) + .withBucketCheckInterval(bucketCheckInterval) + .withBucketFactory(bucketFactory) + .build(); + + return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), 10, totalParallelism, taskIdx); + } + + static void checkLocalFs(File outDir, int expectedInProgress, int expectedCompleted) { + int inProgress = 0; + int finished = 0; + + for (File file: FileUtils.listFiles(outDir, null, true)) { + if (file.getAbsolutePath().endsWith("crc")) { + continue; + } + + if (file.toPath().getFileName().toString().startsWith(".")) { + inProgress++; + } else { + finished++; + } + } + + Assert.assertEquals(expectedInProgress, inProgress); + Assert.assertEquals(expectedCompleted, finished); + } + + static Map<File, String> getFileContentByPath(File directory) throws IOException { + Map<File, String> contents = new HashMap<>(4); + + final Collection<File> filesInBucket = FileUtils.listFiles(directory, null, true); + for (File file : filesInBucket) { + contents.put(file, FileUtils.readFileToString(file)); + } + return contents; + } + + static class TupleToStringBucketer implements Bucketer<Tuple2<String, Integer>, String> { + + private static final long serialVersionUID = 1L; + + @Override + public String getBucketId(Tuple2<String, Integer> element, Context context) { + return element.f0; + } + + @Override + public SimpleVersionedSerializer<String> getSerializer() { + return SimpleVersionedStringSerializer.INSTANCE; + } + } +}
