Repository: flink
Updated Branches:
  refs/heads/release-1.6 5d8431474 -> 702f77355


http://git-wip-us.apache.org/repos/asf/flink/blob/f998f0f9/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
new file mode 100644
index 0000000..61e1433
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies.DefaultRollingPolicy;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Tests for different {@link RollingPolicy rolling policies}.
+ */
+public class RollingPolicyTest {
+
+       @ClassRule
+       public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+       @Test
+       public void testDefaultRollingPolicy() throws Exception {
+               final File outDir = TEMP_FOLDER.newFolder();
+
+               final RollingPolicy<String> rollingPolicy = DefaultRollingPolicy
+                               .create()
+                               .withMaxPartSize(10L)
+                               .withInactivityInterval(4L)
+                               .withRolloverInterval(11L)
+                               .build();
+
+               try (
+                               
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness 
= TestUtils.createCustomRescalingTestSink(
+                                               outDir,
+                                               1,
+                                               0,
+                                               1L,
+                                               new 
TestUtils.TupleToStringBucketer(),
+                                               new SimpleStringEncoder<>(),
+                                               rollingPolicy,
+                                               new DefaultBucketFactory<>())
+               ) {
+                       testHarness.setup();
+                       testHarness.open();
+
+                       testHarness.setProcessingTime(0L);
+
+                       testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 1), 1L));
+                       TestUtils.checkLocalFs(outDir, 1, 0);
+
+                       // roll due to size
+                       testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 2), 2L));
+                       TestUtils.checkLocalFs(outDir, 1, 0);
+
+                       testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 3), 3L));
+                       TestUtils.checkLocalFs(outDir, 2, 0);
+
+                       // roll due to inactivity
+                       testHarness.setProcessingTime(7L);
+
+                       testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 4), 4L));
+                       TestUtils.checkLocalFs(outDir, 3, 0);
+
+                       // roll due to rollover interval
+                       testHarness.setProcessingTime(20L);
+
+                       testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 5), 5L));
+                       TestUtils.checkLocalFs(outDir, 4, 0);
+
+                       // we take a checkpoint but we should not roll.
+                       testHarness.snapshot(1L, 1L);
+
+                       TestUtils.checkLocalFs(outDir, 4, 0);
+
+                       // acknowledge the checkpoint, so publish the 3 closed 
files, but not the open one.
+                       testHarness.notifyOfCompletedCheckpoint(1L);
+                       TestUtils.checkLocalFs(outDir, 1, 3);
+               }
+       }
+
+       @Test
+       public void testRollOnCheckpointPolicy() throws Exception {
+               final File outDir = TEMP_FOLDER.newFolder();
+
+               final RollingPolicy<String> rollingPolicy = new 
OnCheckpointRollingPolicy<>();
+
+               try (
+                               
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness 
= TestUtils.createCustomRescalingTestSink(
+                                               outDir,
+                                               1,
+                                               0,
+                                               10L,
+                                               new 
TestUtils.TupleToStringBucketer(),
+                                               new SimpleStringEncoder<>(),
+                                               rollingPolicy,
+                                               new DefaultBucketFactory<>())
+               ) {
+                       testHarness.setup();
+                       testHarness.open();
+
+                       testHarness.setProcessingTime(0L);
+
+                       testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test2", 1), 1L));
+
+                       testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 1), 1L));
+                       testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 2), 2L));
+                       TestUtils.checkLocalFs(outDir, 2, 0);
+
+                       testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 3), 3L));
+                       TestUtils.checkLocalFs(outDir, 2, 0);
+
+                       // we take a checkpoint so we roll.
+                       testHarness.snapshot(1L, 1L);
+
+                       // this will create a new part file
+                       testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 4), 4L));
+                       TestUtils.checkLocalFs(outDir, 3, 0);
+
+                       // and open and fill .part-0-2.inprogress
+                       testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 5), 5L));
+                       testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 6), 6L));
+                       TestUtils.checkLocalFs(outDir, 3, 0);                   
 // nothing committed yet
+
+                       // we take a checkpoint so we roll.
+                       testHarness.snapshot(2L, 2L);
+
+                       testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test2", 7), 7L));
+                       TestUtils.checkLocalFs(outDir, 4, 0);
+
+                       // we acknowledge the last checkpoint so we should 
publish all but the latest in-progress file
+                       testHarness.notifyOfCompletedCheckpoint(2L);
+                       TestUtils.checkLocalFs(outDir, 1, 3);
+               }
+       }
+
+       @Test
+       public void testCustomRollingPolicy() throws Exception {
+               final File outDir = TEMP_FOLDER.newFolder();
+
+               final RollingPolicy<String> rollingPolicy = new 
RollingPolicy<String>() {
+
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public boolean 
shouldRollOnCheckpoint(PartFileInfo<String> partFileState) {
+                               return true;
+                       }
+
+                       @Override
+                       public boolean shouldRollOnEvent(PartFileInfo<String> 
partFileState) throws IOException {
+                               // this means that 2 elements will close the 
part file.
+                               return partFileState.getSize() > 12L;
+                       }
+
+                       @Override
+                       public boolean 
shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long 
currentTime) {
+                               return false;
+                       }
+               };
+
+               try (
+                               
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness 
= TestUtils.createCustomRescalingTestSink(
+                                               outDir,
+                                               1,
+                                               0,
+                                               10L,
+                                               new 
TestUtils.TupleToStringBucketer(),
+                                               new SimpleStringEncoder<>(),
+                                               rollingPolicy,
+                                               new DefaultBucketFactory<>())
+               ) {
+                       testHarness.setup();
+                       testHarness.open();
+
+                       testHarness.setProcessingTime(0L);
+
+                       testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test2", 1), 1L));
+
+                       // the following 2 elements will close a part file ...
+                       testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 1), 1L));
+                       testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 2), 2L));
+
+                       // ... and this one will open a new ...
+                       testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 3), 2L));
+                       TestUtils.checkLocalFs(outDir, 3, 0);
+
+                       // ... and all open part files should close here.
+                       testHarness.snapshot(1L, 1L);
+
+                       // this will create and fill out a new part file
+                       testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 4), 4L));
+                       testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 5), 5L));
+                       TestUtils.checkLocalFs(outDir, 4, 0);
+
+                       // we take a checkpoint so we roll.
+                       testHarness.snapshot(2L, 2L);
+
+                       // we acknowledge the first checkpoint so we should 
publish all but the latest in-progress file
+                       testHarness.notifyOfCompletedCheckpoint(1L);
+                       TestUtils.checkLocalFs(outDir, 1, 3);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f998f0f9/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
new file mode 100644
index 0000000..184e23e
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies.DefaultRollingPolicy;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utilities for the {@link StreamingFileSink} tests.
+ */
+public class TestUtils {
+
+       static OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Object> createRescalingTestSink(
+                       File outDir,
+                       int totalParallelism,
+                       int taskIdx,
+                       long inactivityInterval,
+                       long partMaxSize) throws Exception {
+
+               final RollingPolicy<String> rollingPolicy =
+                               DefaultRollingPolicy
+                                               .create()
+                                               .withMaxPartSize(partMaxSize)
+                                               
.withRolloverInterval(inactivityInterval)
+                                               
.withInactivityInterval(inactivityInterval)
+                                               .build();
+
+               final Bucketer<Tuple2<String, Integer>, String> bucketer = new 
TupleToStringBucketer();
+
+               final Encoder<Tuple2<String, Integer>> encoder = (element, 
stream) -> {
+                       stream.write((element.f0 + '@' + 
element.f1).getBytes(StandardCharsets.UTF_8));
+                       stream.write('\n');
+               };
+
+               return createCustomRescalingTestSink(
+                               outDir,
+                               totalParallelism,
+                               taskIdx,
+                               10L,
+                               bucketer,
+                               encoder,
+                               rollingPolicy,
+                               new DefaultBucketFactory<>());
+       }
+
+       static OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Object> createCustomRescalingTestSink(
+                       final File outDir,
+                       final int totalParallelism,
+                       final int taskIdx,
+                       final long bucketCheckInterval,
+                       final Bucketer<Tuple2<String, Integer>, String> 
bucketer,
+                       final Encoder<Tuple2<String, Integer>> writer,
+                       final RollingPolicy<String> rollingPolicy,
+                       final BucketFactory<Tuple2<String, Integer>, String> 
bucketFactory) throws Exception {
+
+               StreamingFileSink<Tuple2<String, Integer>> sink = 
StreamingFileSink
+                               .forRowFormat(new Path(outDir.toURI()), writer)
+                               .withBucketer(bucketer)
+                               .withRollingPolicy(rollingPolicy)
+                               .withBucketCheckInterval(bucketCheckInterval)
+                               .withBucketFactory(bucketFactory)
+                               .build();
+
+               return new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink), 10, totalParallelism, taskIdx);
+       }
+
+       static OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Object> createTestSinkWithBulkEncoder(
+                       final File outDir,
+                       final int totalParallelism,
+                       final int taskIdx,
+                       final long bucketCheckInterval,
+                       final Bucketer<Tuple2<String, Integer>, String> 
bucketer,
+                       final BulkWriter.Factory<Tuple2<String, Integer>> 
writer,
+                       final BucketFactory<Tuple2<String, Integer>, String> 
bucketFactory) throws Exception {
+
+               StreamingFileSink<Tuple2<String, Integer>> sink = 
StreamingFileSink
+                               .forBulkFormat(new Path(outDir.toURI()), writer)
+                               .withBucketer(bucketer)
+                               .withBucketCheckInterval(bucketCheckInterval)
+                               .withBucketFactory(bucketFactory)
+                               .build();
+
+               return new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink), 10, totalParallelism, taskIdx);
+       }
+
+       static void checkLocalFs(File outDir, int expectedInProgress, int 
expectedCompleted) {
+               int inProgress = 0;
+               int finished = 0;
+
+               for (File file: FileUtils.listFiles(outDir, null, true)) {
+                       if (file.getAbsolutePath().endsWith("crc")) {
+                               continue;
+                       }
+
+                       if 
(file.toPath().getFileName().toString().startsWith(".")) {
+                               inProgress++;
+                       } else {
+                               finished++;
+                       }
+               }
+
+               Assert.assertEquals(expectedInProgress, inProgress);
+               Assert.assertEquals(expectedCompleted, finished);
+       }
+
+       static Map<File, String> getFileContentByPath(File directory) throws 
IOException {
+               Map<File, String> contents = new HashMap<>(4);
+
+               final Collection<File> filesInBucket = 
FileUtils.listFiles(directory, null, true);
+               for (File file : filesInBucket) {
+                       contents.put(file, FileUtils.readFileToString(file));
+               }
+               return contents;
+       }
+
+       static class TupleToStringBucketer implements Bucketer<Tuple2<String, 
Integer>, String> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public String getBucketId(Tuple2<String, Integer> element, 
Context context) {
+                       return element.f0;
+               }
+
+               @Override
+               public SimpleVersionedSerializer<String> getSerializer() {
+                       return SimpleVersionedStringSerializer.INSTANCE;
+               }
+       }
+}

Reply via email to