Repository: flink
Updated Branches:
  refs/heads/master 8195001bb -> e99949a5c


http://git-wip-us.apache.org/repos/asf/flink/blob/e99949a5/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
new file mode 100644
index 0000000..40e0cf5
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import com.google.common.collect.Sets;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
+import org.apache.flink.util.NetUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.BufferedReader;
+import java.util.Random;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link BucketingSink}.
+ *
+ * <p>
+ * This test only verifies the exactly once behaviour of the sink. Another 
test tests the
+ * rolling behaviour.
+ */
+public class BucketingSinkFaultToleranceITCase extends 
StreamFaultToleranceTestBase {
+
+       final long NUM_STRINGS = 16_000;
+
+       @ClassRule
+       public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+       private static MiniDFSCluster hdfsCluster;
+       private static org.apache.hadoop.fs.FileSystem dfs;
+
+       private static String outPath;
+
+       @BeforeClass
+       public static void createHDFS() throws IOException {
+               Configuration conf = new Configuration();
+
+               File dataDir = tempFolder.newFolder();
+
+               conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
dataDir.getAbsolutePath());
+               MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(conf);
+               hdfsCluster = builder.build();
+
+               dfs = hdfsCluster.getFileSystem();
+
+               outPath = "hdfs://"
+                               + 
NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), 
hdfsCluster.getNameNodePort())
+                               + "/string-non-rolling-out";
+       }
+
+       @AfterClass
+       public static void destroyHDFS() {
+               if (hdfsCluster != null) {
+                       hdfsCluster.shutdown();
+               }
+       }
+
+
+       @Override
+       public void testProgram(StreamExecutionEnvironment env) {
+               assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
+
+               int PARALLELISM = 12;
+
+               env.enableCheckpointing(200);
+               env.setParallelism(PARALLELISM);
+               env.disableOperatorChaining();
+
+               DataStream<String> stream = env.addSource(new 
StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain();
+
+               DataStream<String> mapped = stream
+                               .map(new 
OnceFailingIdentityMapper(NUM_STRINGS));
+
+               BucketingSink<String> sink = new BucketingSink<String>(outPath)
+                               .setBucketer(new BasePathBucketer<String>())
+                               .setBatchSize(10000)
+                               .setValidLengthPrefix("")
+                               .setPendingPrefix("");
+
+               mapped.addSink(sink);
+
+       }
+
+       @Override
+       public void postSubmit() throws Exception {
+               // We read the files and verify that we have read all the 
strings. If a valid-length
+               // file exists we only read the file to that point. (This test 
should work with
+               // FileSystems that support truncate() and with others as well.)
+
+               Pattern messageRegex = Pattern.compile("message (\\d*)");
+
+               // Keep a set of the message IDs that we read. The size must 
equal the read count and
+               // the NUM_STRINGS. If numRead is bigger than the size of the 
set we have seen some
+               // elements twice.
+               Set<Integer> readNumbers = Sets.newHashSet();
+               int numRead = 0;
+
+               RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new 
Path(
+                               outPath), true);
+
+               while (files.hasNext()) {
+                       LocatedFileStatus file = files.next();
+
+                       if 
(!file.getPath().toString().endsWith(".valid-length")) {
+                               int validLength = (int) file.getLen();
+                               if 
(dfs.exists(file.getPath().suffix(".valid-length"))) {
+                                       FSDataInputStream inStream = 
dfs.open(file.getPath().suffix(".valid-length"));
+                                       String validLengthString = 
inStream.readUTF();
+                                       validLength = 
Integer.parseInt(validLengthString);
+                                       System.out.println("VALID LENGTH: " + 
validLength);
+                               }
+                               FSDataInputStream inStream = 
dfs.open(file.getPath());
+                               byte[] buffer = new byte[validLength];
+                               inStream.readFully(0, buffer, 0, validLength);
+                               inStream.close();
+
+                               ByteArrayInputStream bais = new 
ByteArrayInputStream(buffer);
+
+                               InputStreamReader inStreamReader = new 
InputStreamReader(bais);
+                               BufferedReader br = new 
BufferedReader(inStreamReader);
+
+                               String line = br.readLine();
+                               while (line != null) {
+                                       Matcher matcher = 
messageRegex.matcher(line);
+                                       if (matcher.matches()) {
+                                               numRead++;
+                                               int messageId = 
Integer.parseInt(matcher.group(1));
+                                               readNumbers.add(messageId);
+                                       } else {
+                                               Assert.fail("Read line does not 
match expected pattern.");
+                                       }
+                                       line = br.readLine();
+                               }
+                               br.close();
+                               inStreamReader.close();
+                               bais.close();
+                       }
+               }
+
+               // Verify that we read all strings (at-least-once)
+               Assert.assertEquals(NUM_STRINGS, readNumbers.size());
+
+               // Verify that we don't have duplicates (boom!, exactly-once)
+               Assert.assertEquals(NUM_STRINGS, numRead);
+       }
+
+       private static class OnceFailingIdentityMapper extends 
RichMapFunction<String, String> {
+               private static final long serialVersionUID = 1L;
+
+               private static volatile boolean hasFailed = false;
+
+               private final long numElements;
+
+               private long failurePos;
+               private long count;
+
+
+               OnceFailingIdentityMapper(long numElements) {
+                       this.numElements = numElements;
+               }
+
+               @Override
+               public void open(org.apache.flink.configuration.Configuration 
parameters) throws IOException {
+                       long failurePosMin = (long) (0.7 * numElements / 
getRuntimeContext().getNumberOfParallelSubtasks());
+                       long failurePosMax = (long) (0.9 * numElements / 
getRuntimeContext().getNumberOfParallelSubtasks());
+
+                       failurePos = (new Random().nextLong() % (failurePosMax 
- failurePosMin)) + failurePosMin;
+                       count = 0;
+               }
+
+               @Override
+               public String map(String value) throws Exception {
+                       count++;
+                       if (!hasFailed && count >= failurePos) {
+                               hasFailed = true;
+                               throw new Exception("Test Failure");
+                       }
+
+                       return value;
+               }
+       }
+
+       private static class StringGeneratingSourceFunction extends 
RichParallelSourceFunction<String>
+                       implements CheckpointedAsynchronously<Integer> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final long numElements;
+
+               private int index;
+
+               private volatile boolean isRunning = true;
+
+
+               StringGeneratingSourceFunction(long numElements) {
+                       this.numElements = numElements;
+               }
+
+               @Override
+               public void run(SourceContext<String> ctx) throws Exception {
+                       final Object lockingObject = ctx.getCheckpointLock();
+
+                       final int step = 
getRuntimeContext().getNumberOfParallelSubtasks();
+
+                       if (index == 0) {
+                               index = 
getRuntimeContext().getIndexOfThisSubtask();
+                       }
+
+                       while (isRunning && index < numElements) {
+
+                               Thread.sleep(1);
+                               synchronized (lockingObject) {
+                                       ctx.collect("message " + index);
+                                       index += step;
+                               }
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       isRunning = false;
+               }
+
+               private static String randomString(StringBuilder bld, Random 
rnd) {
+                       final int len = rnd.nextInt(10) + 5;
+
+                       for (int i = 0; i < len; i++) {
+                               char next = (char) (rnd.nextInt(20000) + 33);
+                               bld.append(next);
+                       }
+
+                       return bld.toString();
+               }
+
+               @Override
+               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
+                       return index;
+               }
+
+               @Override
+               public void restoreState(Integer state) {
+                       index = state;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e99949a5/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
new file mode 100644
index 0000000..828dcc9
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter;
+import org.apache.flink.streaming.connectors.fs.Clock;
+import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
+import org.apache.flink.streaming.connectors.fs.StringWriter;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.NetUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.Assert;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+
+public class BucketingSinkTest {
+       @ClassRule
+       public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+       private static MiniDFSCluster hdfsCluster;
+       private static org.apache.hadoop.fs.FileSystem dfs;
+       private static String hdfsURI;
+
+       private OneInputStreamOperatorTestHarness<String, Object> 
createTestSink(File dataDir, TimeServiceProvider clock) {
+               BucketingSink<String> sink = new 
BucketingSink<String>(dataDir.getAbsolutePath())
+                       .setBucketer(new Bucketer<String>() {
+                               private static final long serialVersionUID = 1L;
+
+                               @Override
+                               public Path getBucketPath(Clock clock, Path 
basePath, String element) {
+                                       return new Path(basePath, element);
+                               }
+                       })
+                       .setWriter(new StringWriter<String>())
+                       .setPartPrefix("part")
+                       .setPendingPrefix("")
+                       .setInactiveBucketCheckInterval(5*60*1000L)
+                       .setInactiveBucketThreshold(5*60*1000L)
+                       .setPendingSuffix(".pending");
+
+               return createTestSink(sink, clock);
+       }
+
+       private <T> OneInputStreamOperatorTestHarness<T, Object> 
createTestSink(BucketingSink<T> sink,
+                                                                               
                                                                        
TimeServiceProvider clock) {
+               return new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink), new ExecutionConfig(), clock);
+       }
+
+       @BeforeClass
+       public static void createHDFS() throws IOException {
+               Configuration conf = new Configuration();
+
+               File dataDir = tempFolder.newFolder();
+
+               conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
dataDir.getAbsolutePath());
+               MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(conf);
+               hdfsCluster = builder.build();
+
+               dfs = hdfsCluster.getFileSystem();
+
+               hdfsURI = "hdfs://"
+                       + 
NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), 
hdfsCluster.getNameNodePort())
+                       + "/";
+       }
+
+       @AfterClass
+       public static void destroyHDFS() {
+               hdfsCluster.shutdown();
+       }
+
+       @Test
+       public void testCheckpointWithoutNotify() throws Exception {
+               File dataDir = tempFolder.newFolder();
+
+               TestTimeServiceProvider clock = new TestTimeServiceProvider();
+               clock.setCurrentTime(0L);
+
+               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestSink(dataDir, clock);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processElement(new StreamRecord<>("Hello"));
+               testHarness.processElement(new StreamRecord<>("Hello"));
+               testHarness.processElement(new StreamRecord<>("Hello"));
+
+               clock.setCurrentTime(10000L);
+
+               // snapshot but don't call notify to simulate a notify that 
never
+               // arrives, the sink should move pending files in restore() in 
that case
+               StreamTaskState snapshot1 = testHarness.snapshot(0, 0);
+
+               testHarness = createTestSink(dataDir, clock);
+               testHarness.setup();
+               testHarness.restore(snapshot1, 1);
+               testHarness.open();
+
+               testHarness.processElement(new StreamRecord<>("Hello"));
+
+               testHarness.close();
+
+               int numComplete = 0;
+               int numPending = 0;
+               for (File file: FileUtils.listFiles(dataDir, null, true)) {
+                       if (file.getAbsolutePath().endsWith("crc")) {
+                               continue;
+                       }
+                       if (file.getPath().contains("pending")) {
+                               numPending++;
+                       } else if (file.getName().startsWith("part")) {
+                               numComplete++;
+                       }
+               }
+
+               Assert.assertEquals(1, numComplete);
+               Assert.assertEquals(1, numPending);
+       }
+
+       /**
+        * This tests {@link StringWriter} with
+        * non-bucketing output.
+        */
+       @Test
+       public void testNonRollingStringWriter() throws Exception {
+               final String outPath = hdfsURI + "/string-non-rolling-out";
+
+               final int numElements = 20;
+
+               TestTimeServiceProvider clock = new TestTimeServiceProvider();
+               clock.setCurrentTime(0L);
+
+               BucketingSink<String> sink = new BucketingSink<String>(outPath)
+                       .setBucketer(new BasePathBucketer<String>())
+                       .setPartPrefix("part")
+                       .setPendingPrefix("")
+                       .setPendingSuffix("");
+
+               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestSink(sink, clock);
+
+               testHarness.setup();
+               testHarness.open();
+
+               for (int i = 0; i < numElements; i++) {
+                       testHarness.processElement(new StreamRecord<>("message 
#" + Integer.toString(i)));
+               }
+
+               testHarness.close();
+
+               FSDataInputStream inStream = dfs.open(new Path(outPath + 
"/part-0-0"));
+
+               BufferedReader br = new BufferedReader(new 
InputStreamReader(inStream));
+
+               for (int i = 0; i < numElements; i++) {
+                       String line = br.readLine();
+                       Assert.assertEquals("message #" + i, line);
+               }
+
+               inStream.close();
+       }
+
+       /**
+        * This tests {@link SequenceFileWriter}
+        * with non-rolling output and without compression.
+        */
+       @Test
+       public void testNonRollingSequenceFileWithoutCompressionWriter() throws 
Exception {
+               final String outPath = hdfsURI + "/seq-no-comp-non-rolling-out";
+
+               final int numElements = 20;
+
+               TestTimeServiceProvider clock = new TestTimeServiceProvider();
+               clock.setCurrentTime(0L);
+
+               BucketingSink<Tuple2<IntWritable, Text>> sink = new 
BucketingSink<Tuple2<IntWritable, Text>>(outPath)
+                       .setWriter(new SequenceFileWriter<IntWritable, Text>())
+                       .setBucketer(new BasePathBucketer<Tuple2<IntWritable, 
Text>>())
+                       .setPartPrefix("part")
+                       .setPendingPrefix("")
+                       .setPendingSuffix("");
+
+               sink.setInputType(TypeInformation.of(new 
TypeHint<Tuple2<IntWritable, Text>>(){}), new ExecutionConfig());
+
+               OneInputStreamOperatorTestHarness<Tuple2<IntWritable, Text>, 
Object> testHarness =
+                       createTestSink(sink, clock);
+
+               testHarness.setup();
+               testHarness.open();
+
+               for (int i = 0; i < numElements; i++) {
+                       testHarness.processElement(new StreamRecord<>(Tuple2.of(
+                               new IntWritable(i),
+                               new Text("message #" + Integer.toString(i))
+                       )));
+               }
+
+               testHarness.close();
+
+               FSDataInputStream inStream = dfs.open(new Path(outPath + 
"/part-0-0"));
+
+               SequenceFile.Reader reader = new SequenceFile.Reader(inStream, 
1000, 0, 100000, new Configuration());
+
+               IntWritable intWritable = new IntWritable();
+               Text txt = new Text();
+
+               for (int i = 0; i < numElements; i++) {
+                       reader.next(intWritable, txt);
+                       Assert.assertEquals(i, intWritable.get());
+                       Assert.assertEquals("message #" + i, txt.toString());
+               }
+
+               reader.close();
+               inStream.close();
+       }
+
+       /**
+        * This tests {@link AvroKeyValueSinkWriter}
+        * with non-rolling output and with compression.
+        */
+       @Test
+       public void testNonRollingAvroKeyValueWithCompressionWriter() throws 
Exception {
+               final String outPath = hdfsURI + 
"/avro-kv-no-comp-non-rolling-out";
+
+               final int numElements = 20;
+
+               TestTimeServiceProvider clock = new TestTimeServiceProvider();
+               clock.setCurrentTime(0L);
+
+               Map<String, String> properties = new HashMap<>();
+               Schema keySchema = Schema.create(Schema.Type.INT);
+               Schema valueSchema = Schema.create(Schema.Type.STRING);
+               properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, 
keySchema.toString());
+               properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, 
valueSchema.toString());
+               properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, 
String.valueOf(true));
+               properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, 
DataFileConstants.SNAPPY_CODEC);
+
+               BucketingSink<Tuple2<Integer, String>> sink = new 
BucketingSink<Tuple2<Integer, String>>(outPath)
+                       .setWriter(new AvroKeyValueSinkWriter<Integer, 
String>(properties))
+                       .setBucketer(new BasePathBucketer<Tuple2<Integer, 
String>>())
+                       .setPartPrefix("part")
+                       .setPendingPrefix("")
+                       .setPendingSuffix("");
+
+               OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, 
Object> testHarness =
+                       createTestSink(sink, clock);
+
+               testHarness.setup();
+               testHarness.open();
+
+               for (int i = 0; i < numElements; i++) {
+                       testHarness.processElement(new StreamRecord<>(Tuple2.of(
+                               i, "message #" + Integer.toString(i)
+                       )));
+               }
+
+               testHarness.close();
+
+               GenericData.setStringType(valueSchema, 
GenericData.StringType.String);
+               Schema elementSchema = 
AvroKeyValueSinkWriter.AvroKeyValue.getSchema(keySchema, valueSchema);
+
+               FSDataInputStream inStream = dfs.open(new Path(outPath + 
"/part-0-0"));
+
+               SpecificDatumReader<GenericRecord> elementReader = new 
SpecificDatumReader<>(elementSchema);
+               DataFileStream<GenericRecord> dataFileStream = new 
DataFileStream<>(inStream, elementReader);
+               for (int i = 0; i < numElements; i++) {
+                       AvroKeyValueSinkWriter.AvroKeyValue<Integer, String> 
wrappedEntry =
+                               new 
AvroKeyValueSinkWriter.AvroKeyValue<>(dataFileStream.next());
+                       int key = wrappedEntry.getKey();
+                       Assert.assertEquals(i, key);
+                       String value = wrappedEntry.getValue();
+                       Assert.assertEquals("message #" + i, value);
+               }
+
+               dataFileStream.close();
+               inStream.close();
+       }
+
+       /**
+        * This uses {@link DateTimeBucketer} to
+        * produce rolling files. A custom {@link TimeServiceProvider} is set
+        * to simulate the advancing of time alongside the processing of 
elements.
+        */
+       @Test
+       public void testDateTimeRollingStringWriter() throws Exception {
+               final int numElements = 20;
+
+               final String outPath = hdfsURI + "/rolling-out";
+
+               TestTimeServiceProvider clock = new TestTimeServiceProvider();
+               clock.setCurrentTime(0L);
+
+               BucketingSink<String> sink = new BucketingSink<String>(outPath)
+                       .setBucketer(new DateTimeBucketer<String>("ss"))
+                       .setPartPrefix("part")
+                       .setPendingPrefix("")
+                       .setPendingSuffix("");
+
+               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestSink(sink, clock);
+
+               testHarness.setup();
+               testHarness.open();
+
+               for (int i = 0; i < numElements; i++) {
+                       // Every 5 elements, increase the clock time. We should 
end up with 5 elements per bucket.
+                       if (i % 5 == 0) {
+                               clock.setCurrentTime(i * 1000L);
+                       }
+                       testHarness.processElement(new StreamRecord<>("message 
#" + Integer.toString(i)));
+               }
+
+               testHarness.close();
+
+               RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new 
Path(outPath), true);
+
+               // We should have 4 rolling files across 4 time intervals
+               int numFiles = 0;
+               while (files.hasNext()) {
+                       LocatedFileStatus file = files.next();
+                       numFiles++;
+                       if 
(file.getPath().toString().contains("rolling-out/00")) {
+                               FSDataInputStream inStream = 
dfs.open(file.getPath());
+
+                               BufferedReader br = new BufferedReader(new 
InputStreamReader(inStream));
+
+                               for (int i = 0; i < 5; i++) {
+                                       String line = br.readLine();
+                                       Assert.assertEquals("message #" + i, 
line);
+                               }
+
+                               inStream.close();
+                       } else if 
(file.getPath().toString().contains("rolling-out/05")) {
+                               FSDataInputStream inStream = 
dfs.open(file.getPath());
+
+                               BufferedReader br = new BufferedReader(new 
InputStreamReader(inStream));
+
+                               for (int i = 5; i < 10; i++) {
+                                       String line = br.readLine();
+                                       Assert.assertEquals("message #" + i, 
line);
+                               }
+
+                               inStream.close();
+                       } else if 
(file.getPath().toString().contains("rolling-out/10")) {
+                               FSDataInputStream inStream = 
dfs.open(file.getPath());
+
+                               BufferedReader br = new BufferedReader(new 
InputStreamReader(inStream));
+
+                               for (int i = 10; i < 15; i++) {
+                                       String line = br.readLine();
+                                       Assert.assertEquals("message #" + i, 
line);
+                               }
+
+                               inStream.close();
+                       } else if 
(file.getPath().toString().contains("rolling-out/15")) {
+                               FSDataInputStream inStream = 
dfs.open(file.getPath());
+
+                               BufferedReader br = new BufferedReader(new 
InputStreamReader(inStream));
+
+                               for (int i = 15; i < 20; i++) {
+                                       String line = br.readLine();
+                                       Assert.assertEquals("message #" + i, 
line);
+                               }
+
+                               inStream.close();
+                       } else {
+                               Assert.fail("File " + file + " does not match 
any expected roll pattern.");
+                       }
+               }
+
+               Assert.assertEquals(4, numFiles);
+       }
+
+       /**
+        * This uses a custom bucketing function which determines the bucket 
from the input.
+        */
+       @Test
+       public void testCustomBucketing() throws Exception {
+               File dataDir = tempFolder.newFolder();
+
+               final int numIds = 4;
+               final int numElements = 20;
+
+               TestTimeServiceProvider clock = new TestTimeServiceProvider();
+               clock.setCurrentTime(0L);
+
+               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestSink(dataDir, clock);
+
+               testHarness.setup();
+               testHarness.open();
+
+               for (int i = 0; i < numElements; i++) {
+                       testHarness.processElement(new 
StreamRecord<>(Integer.toString(i % numIds)));
+               }
+
+               testHarness.close();
+
+               // we should have 4 buckets, with 1 file each
+               int numFiles = 0;
+               for (File file: FileUtils.listFiles(dataDir, null, true)) {
+                       if (file.getName().startsWith("part")) {
+                               numFiles++;
+                       }
+               }
+
+               Assert.assertEquals(4, numFiles);
+       }
+
+       /**
+        * This uses a custom bucketing function which determines the bucket 
from the input.
+        * We use a simulated clock to reduce the number of buckets being 
written to over time.
+        * This causes buckets to become 'inactive' and their file parts 
'closed' by the sink.
+        */
+       @Test
+       public void testCustomBucketingInactiveBucketCleanup() throws Exception 
{
+               File dataDir = tempFolder.newFolder();
+
+               final int step1NumIds = 4;
+               final int step2NumIds = 2;
+               final int numElementsPerStep = 20;
+
+               TestTimeServiceProvider clock = new TestTimeServiceProvider();
+               clock.setCurrentTime(0L);
+
+               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestSink(dataDir, clock);
+
+               testHarness.setup();
+               testHarness.open();
+
+               for (int i = 0; i < numElementsPerStep; i++) {
+                       testHarness.processElement(new 
StreamRecord<>(Integer.toString(i % step1NumIds)));
+               }
+
+               clock.setCurrentTime(2*60*1000L);
+
+               for (int i = 0; i < numElementsPerStep; i++) {
+                       testHarness.processElement(new 
StreamRecord<>(Integer.toString(i % step2NumIds)));
+               }
+
+               clock.setCurrentTime(6*60*1000L);
+
+               for (int i = 0; i < numElementsPerStep; i++) {
+                       testHarness.processElement(new 
StreamRecord<>(Integer.toString(i % step2NumIds)));
+               }
+
+               // we should have 4 buckets, with 1 file each
+               // 2 of these buckets should have been finalised due to 
becoming inactive
+               int numFiles = 0;
+               int numInProgress = 0;
+               for (File file: FileUtils.listFiles(dataDir, null, true)) {
+                       if (file.getAbsolutePath().endsWith("crc")) {
+                               continue;
+                       }
+                       if (file.getPath().contains("in-progress")) {
+                               numInProgress++;
+                       }
+                       numFiles++;
+               }
+
+               testHarness.close();
+
+               Assert.assertEquals(4, numFiles);
+               Assert.assertEquals(2, numInProgress);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e99949a5/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 40086c5..6855989 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -120,6 +120,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
                when(mockTask.getConfiguration()).thenReturn(config);
                when(mockTask.getEnvironment()).thenReturn(env);
                when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
+               
when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader());
 
                try {
                        doAnswer(new Answer<AbstractStateBackend>() {
@@ -205,7 +206,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
        }
 
        /**
-        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#snapshotOperatorState(long,
 long)} ()}
+        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#snapshotOperatorState(long,
 long)}
         */
        public StreamTaskState snapshot(long checkpointId, long timestamp) 
throws Exception {
                StreamTaskState snapshot = 
operator.snapshotOperatorState(checkpointId, timestamp);
@@ -235,7 +236,14 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
        }
 
        /**
-        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#restoreState(StreamTaskState)}
 ()}
+        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)}
+        */
+       public void notifyOfCompletedCheckpoint(long checkpointId) throws 
Exception {
+               operator.notifyOfCompletedCheckpoint(checkpointId);
+       }
+
+       /**
+        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#restoreState(StreamTaskState)}
         */
        public void restore(StreamTaskState snapshot, long recoveryTimestamp) 
throws Exception {
                operator.restoreState(snapshot);

Reply via email to