http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java new file mode 100644 index 0000000..80ae294 --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java @@ -0,0 +1,991 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.fs; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; +import org.apache.avro.file.DataFileConstants; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericData.StringType; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichFilterFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.AvroKeyValue; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.util.Collector; +import org.apache.flink.util.NetUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; + +/** + * Tests for {@link RollingSink}. These + * tests test the different output methods as well as the rolling feature using a manual clock + * that increases time in lockstep with element computation using latches. + * + * <p> + * This only tests the rolling behaviour of the sink. There is a separate ITCase that verifies + * exactly once behaviour. + * + * @deprecated should be removed with the {@link RollingSink}. + */ +@Deprecated +public class RollingSinkITCase extends StreamingMultipleProgramsTestBase { + + protected static final Logger LOG = LoggerFactory.getLogger(RollingSinkITCase.class); + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + protected static MiniDFSCluster hdfsCluster; + protected static org.apache.hadoop.fs.FileSystem dfs; + protected static String hdfsURI; + protected static Configuration conf = new Configuration(); + + protected static File dataDir; + + @BeforeClass + public static void createHDFS() throws IOException { + + LOG.info("In RollingSinkITCase: Starting MiniDFSCluster "); + + dataDir = tempFolder.newFolder(); + + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath()); + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); + hdfsCluster = builder.build(); + + dfs = hdfsCluster.getFileSystem(); + + hdfsURI = "hdfs://" + + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort()) + + "/"; + } + + @AfterClass + public static void destroyHDFS() { + LOG.info("In RollingSinkITCase: tearing down MiniDFSCluster "); + hdfsCluster.shutdown(); + } + + /** + * This tests {@link StringWriter} with + * non-rolling output. + */ + @Test + public void testNonRollingStringWriter() throws Exception { + final int NUM_ELEMENTS = 20; + final int PARALLELISM = 2; + final String outPath = hdfsURI + "/string-non-rolling-out"; + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS)) + .broadcast() + .filter(new OddEvenFilter()); + + RollingSink<String> sink = new RollingSink<String>(outPath) + .setBucketer(new NonRollingBucketer()) + .setPartPrefix("part") + .setPendingPrefix("") + .setPendingSuffix(""); + + source + .map(new MapFunction<Tuple2<Integer,String>, String>() { + private static final long serialVersionUID = 1L; + @Override + public String map(Tuple2<Integer, String> value) throws Exception { + return value.f1; + } + }) + .addSink(sink); + + env.execute("RollingSink String Write Test"); + + FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0")); + + BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); + + for (int i = 0; i < NUM_ELEMENTS; i += 2) { + String line = br.readLine(); + Assert.assertEquals("message #" + i, line); + } + + inStream.close(); + + inStream = dfs.open(new Path(outPath + "/part-1-0")); + + br = new BufferedReader(new InputStreamReader(inStream)); + + for (int i = 1; i < NUM_ELEMENTS; i += 2) { + String line = br.readLine(); + Assert.assertEquals("message #" + i, line); + } + + inStream.close(); + } + + /** + * This tests {@link SequenceFileWriter} + * with non-rolling output and without compression. + */ + @Test + public void testNonRollingSequenceFileWithoutCompressionWriter() throws Exception { + final int NUM_ELEMENTS = 20; + final int PARALLELISM = 2; + final String outPath = hdfsURI + "/seq-no-comp-non-rolling-out"; + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS)) + .broadcast() + .filter(new OddEvenFilter()); + + DataStream<Tuple2<IntWritable, Text>> mapped = source.map(new MapFunction<Tuple2<Integer,String>, Tuple2<IntWritable, Text>>() { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<IntWritable, Text> map(Tuple2<Integer, String> value) throws Exception { + return Tuple2.of(new IntWritable(value.f0), new Text(value.f1)); + } + }); + + + RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath) + .setWriter(new SequenceFileWriter<IntWritable, Text>()) + .setBucketer(new NonRollingBucketer()) + .setPartPrefix("part") + .setPendingPrefix("") + .setPendingSuffix(""); + + mapped.addSink(sink); + + env.execute("RollingSink String Write Test"); + + FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0")); + + SequenceFile.Reader reader = new SequenceFile.Reader(inStream, + 1000, + 0, + 100000, + new Configuration()); + + IntWritable intWritable = new IntWritable(); + Text txt = new Text(); + + for (int i = 0; i < NUM_ELEMENTS; i += 2) { + reader.next(intWritable, txt); + Assert.assertEquals(i, intWritable.get()); + Assert.assertEquals("message #" + i, txt.toString()); + } + + reader.close(); + inStream.close(); + + inStream = dfs.open(new Path(outPath + "/part-1-0")); + + reader = new SequenceFile.Reader(inStream, + 1000, + 0, + 100000, + new Configuration()); + + for (int i = 1; i < NUM_ELEMENTS; i += 2) { + reader.next(intWritable, txt); + Assert.assertEquals(i, intWritable.get()); + Assert.assertEquals("message #" + i, txt.toString()); + } + + reader.close(); + inStream.close(); + } + + /** + * This tests {@link SequenceFileWriter} + * with non-rolling output but with compression. + */ + @Test + public void testNonRollingSequenceFileWithCompressionWriter() throws Exception { + final int NUM_ELEMENTS = 20; + final int PARALLELISM = 2; + final String outPath = hdfsURI + "/seq-non-rolling-out"; + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS)) + .broadcast() + .filter(new OddEvenFilter()); + + DataStream<Tuple2<IntWritable, Text>> mapped = source.map(new MapFunction<Tuple2<Integer,String>, Tuple2<IntWritable, Text>>() { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<IntWritable, Text> map(Tuple2<Integer, String> value) throws Exception { + return Tuple2.of(new IntWritable(value.f0), new Text(value.f1)); + } + }); + + + RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath) + .setWriter(new SequenceFileWriter<IntWritable, Text>("Default", SequenceFile.CompressionType.BLOCK)) + .setBucketer(new NonRollingBucketer()) + .setPartPrefix("part") + .setPendingPrefix("") + .setPendingSuffix(""); + + mapped.addSink(sink); + + env.execute("RollingSink String Write Test"); + + FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0")); + + SequenceFile.Reader reader = new SequenceFile.Reader(inStream, + 1000, + 0, + 100000, + new Configuration()); + + IntWritable intWritable = new IntWritable(); + Text txt = new Text(); + + for (int i = 0; i < NUM_ELEMENTS; i += 2) { + reader.next(intWritable, txt); + Assert.assertEquals(i, intWritable.get()); + Assert.assertEquals("message #" + i, txt.toString()); + } + + reader.close(); + inStream.close(); + + inStream = dfs.open(new Path(outPath + "/part-1-0")); + + reader = new SequenceFile.Reader(inStream, + 1000, + 0, + 100000, + new Configuration()); + + for (int i = 1; i < NUM_ELEMENTS; i += 2) { + reader.next(intWritable, txt); + Assert.assertEquals(i, intWritable.get()); + Assert.assertEquals("message #" + i, txt.toString()); + } + + reader.close(); + inStream.close(); + } + + + /** + * This tests {@link AvroKeyValueSinkWriter} + * with non-rolling output and without compression. + */ + @Test + public void testNonRollingAvroKeyValueWithoutCompressionWriter() throws Exception { + final int NUM_ELEMENTS = 20; + final int PARALLELISM = 2; + final String outPath = hdfsURI + "/avro-kv-no-comp-non-rolling-out"; + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS)) + .broadcast() + .filter(new OddEvenFilter()); + + + Map<String, String> properties = new HashMap<>(); + Schema keySchema = Schema.create(Type.INT); + Schema valueSchema = Schema.create(Type.STRING); + properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString()); + properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString()); + RollingSink<Tuple2<Integer, String>> sink = new RollingSink<Tuple2<Integer, String>>(outPath) + .setWriter(new AvroKeyValueSinkWriter<Integer, String>(properties)) + .setBucketer(new NonRollingBucketer()) + .setPartPrefix("part") + .setPendingPrefix("") + .setPendingSuffix(""); + + source.addSink(sink); + + env.execute("RollingSink Avro KeyValue Writer Test"); + + GenericData.setStringType(valueSchema, StringType.String); + Schema elementSchema = AvroKeyValue.getSchema(keySchema, valueSchema); + + FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0")); + SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<GenericRecord>(elementSchema); + DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader); + for (int i = 0; i < NUM_ELEMENTS; i += 2) { + AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next()); + int key = wrappedEntry.getKey().intValue(); + Assert.assertEquals(i, key); + String value = wrappedEntry.getValue(); + Assert.assertEquals("message #" + i, value); + } + + dataFileStream.close(); + inStream.close(); + + inStream = dfs.open(new Path(outPath + "/part-1-0")); + dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader); + + for (int i = 1; i < NUM_ELEMENTS; i += 2) { + AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next()); + int key = wrappedEntry.getKey().intValue(); + Assert.assertEquals(i, key); + String value = wrappedEntry.getValue(); + Assert.assertEquals("message #" + i, value); + } + + dataFileStream.close(); + inStream.close(); + } + + /** + * This tests {@link AvroKeyValueSinkWriter} + * with non-rolling output and with compression. + */ + @Test + public void testNonRollingAvroKeyValueWithCompressionWriter() throws Exception { + final int NUM_ELEMENTS = 20; + final int PARALLELISM = 2; + final String outPath = hdfsURI + "/avro-kv-no-comp-non-rolling-out"; + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS)) + .broadcast() + .filter(new OddEvenFilter()); + + + Map<String, String> properties = new HashMap<>(); + Schema keySchema = Schema.create(Type.INT); + Schema valueSchema = Schema.create(Type.STRING); + properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString()); + properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString()); + properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, String.valueOf(true)); + properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC); + RollingSink<Tuple2<Integer, String>> sink = new RollingSink<Tuple2<Integer, String>>(outPath) + .setWriter(new AvroKeyValueSinkWriter<Integer, String>(properties)) + .setBucketer(new NonRollingBucketer()) + .setPartPrefix("part") + .setPendingPrefix("") + .setPendingSuffix(""); + + source.addSink(sink); + + env.execute("RollingSink Avro KeyValue Writer Test"); + + GenericData.setStringType(valueSchema, StringType.String); + Schema elementSchema = AvroKeyValue.getSchema(keySchema, valueSchema); + + FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0")); + SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<GenericRecord>(elementSchema); + DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader); + for (int i = 0; i < NUM_ELEMENTS; i += 2) { + AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next()); + int key = wrappedEntry.getKey().intValue(); + Assert.assertEquals(i, key); + String value = wrappedEntry.getValue(); + Assert.assertEquals("message #" + i, value); + } + + dataFileStream.close(); + inStream.close(); + + inStream = dfs.open(new Path(outPath + "/part-1-0")); + dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader); + + for (int i = 1; i < NUM_ELEMENTS; i += 2) { + AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next()); + int key = wrappedEntry.getKey().intValue(); + Assert.assertEquals(i, key); + String value = wrappedEntry.getValue(); + Assert.assertEquals("message #" + i, value); + } + + dataFileStream.close(); + inStream.close(); + } + + + /** + * This tests user defined hdfs configuration + * @throws Exception + */ + @Test + public void testUserDefinedConfiguration() throws Exception { + final int NUM_ELEMENTS = 20; + final int PARALLELISM = 2; + final String outPath = hdfsURI + "/string-non-rolling-with-config"; + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS)) + .broadcast() + .filter(new OddEvenFilter()); + + Configuration conf = new Configuration(); + conf.set("io.file.buffer.size", "40960"); + RollingSink<String> sink = new RollingSink<String>(outPath) + .setFSConfig(conf) + .setWriter(new StreamWriterWithConfigCheck<String>("io.file.buffer.size", "40960")) + .setBucketer(new NonRollingBucketer()) + .setPartPrefix("part") + .setPendingPrefix("") + .setPendingSuffix(""); + + source + .map(new MapFunction<Tuple2<Integer,String>, String>() { + private static final long serialVersionUID = 1L; + @Override + public String map(Tuple2<Integer, String> value) throws Exception { + return value.f1; + } + }) + .addSink(sink); + + env.execute("RollingSink with configuration Test"); + + FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0")); + + BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); + + for (int i = 0; i < NUM_ELEMENTS; i += 2) { + String line = br.readLine(); + Assert.assertEquals("message #" + i, line); + } + + inStream.close(); + + inStream = dfs.open(new Path(outPath + "/part-1-0")); + + br = new BufferedReader(new InputStreamReader(inStream)); + + for (int i = 1; i < NUM_ELEMENTS; i += 2) { + String line = br.readLine(); + Assert.assertEquals("message #" + i, line); + } + + inStream.close(); + } + + // we use this to synchronize the clock changes to elements being processed + final static MultiShotLatch latch1 = new MultiShotLatch(); + final static MultiShotLatch latch2 = new MultiShotLatch(); + + /** + * This uses {@link org.apache.flink.streaming.connectors.fs.DateTimeBucketer} to + * produce rolling files. The clock of DateTimeBucketer is set to + * {@link ModifyableClock} to keep the time in lockstep with the processing of elements using + * latches. + */ + @Test + public void testDateTimeRollingStringWriter() throws Exception { + final int NUM_ELEMENTS = 20; + final int PARALLELISM = 2; + final String outPath = hdfsURI + "/rolling-out"; + DateTimeBucketer.setClock(new ModifyableClock()); + ModifyableClock.setCurrentTime(0); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + + + DataStream<Tuple2<Integer, String>> source = env.addSource(new WaitingTestSourceFunction( + NUM_ELEMENTS)) + .broadcast(); + + // the parallel flatMap is chained to the sink, so when it has seen 5 elements it can + // fire the latch + DataStream<String> mapped = source + .flatMap(new RichFlatMapFunction<Tuple2<Integer, String>, String>() { + private static final long serialVersionUID = 1L; + + int count = 0; + @Override + public void flatMap(Tuple2<Integer, String> value, + Collector<String> out) throws Exception { + out.collect(value.f1); + count++; + if (count >= 5) { + if (getRuntimeContext().getIndexOfThisSubtask() == 0) { + latch1.trigger(); + } else { + latch2.trigger(); + } + count = 0; + } + } + + }); + + RollingSink<String> sink = new RollingSink<String>(outPath) + .setBucketer(new DateTimeBucketer("ss")) + .setPartPrefix("part") + .setPendingPrefix("") + .setPendingSuffix(""); + + mapped.addSink(sink); + + env.execute("RollingSink String Write Test"); + + RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(outPath), true); + + // we should have 8 rolling files, 4 time intervals and parallelism of 2 + int numFiles = 0; + while (files.hasNext()) { + LocatedFileStatus file = files.next(); + numFiles++; + if (file.getPath().toString().contains("rolling-out/00")) { + FSDataInputStream inStream = dfs.open(file.getPath()); + + BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); + + for (int i = 0; i < 5; i++) { + String line = br.readLine(); + Assert.assertEquals("message #" + i, line); + } + + inStream.close(); + } else if (file.getPath().toString().contains("rolling-out/05")) { + FSDataInputStream inStream = dfs.open(file.getPath()); + + BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); + + for (int i = 5; i < 10; i++) { + String line = br.readLine(); + Assert.assertEquals("message #" + i, line); + } + + inStream.close(); + } else if (file.getPath().toString().contains("rolling-out/10")) { + FSDataInputStream inStream = dfs.open(file.getPath()); + + BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); + + for (int i = 10; i < 15; i++) { + String line = br.readLine(); + Assert.assertEquals("message #" + i, line); + } + + inStream.close(); + } else if (file.getPath().toString().contains("rolling-out/15")) { + FSDataInputStream inStream = dfs.open(file.getPath()); + + BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); + + for (int i = 15; i < 20; i++) { + String line = br.readLine(); + Assert.assertEquals("message #" + i, line); + } + + inStream.close(); + } else { + Assert.fail("File " + file + " does not match any expected roll pattern."); + } + } + + Assert.assertEquals(8, numFiles); + } + + private static final String PART_PREFIX = "part"; + private static final String PENDING_SUFFIX = ".pending"; + private static final String IN_PROGRESS_SUFFIX = ".in-progress"; + private static final String VALID_LENGTH_SUFFIX = ".valid"; + + @Test + public void testBucketStateTransitions() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(0L); + + // we have a bucket size of 5 bytes, so each record will get its own bucket, + // i.e. the bucket should roll after every record. + + testHarness.processElement(new StreamRecord<>("test1", 1L)); + testHarness.processElement(new StreamRecord<>("test2", 1L)); + checkFs(outDir, 1, 1 ,0, 0); + + testHarness.processElement(new StreamRecord<>("test3", 1L)); + checkFs(outDir, 1, 2, 0, 0); + + testHarness.snapshot(0, 0); + checkFs(outDir, 1, 2, 0, 0); + + testHarness.notifyOfCompletedCheckpoint(0); + checkFs(outDir, 1, 0, 2, 0); + + OperatorStateHandles snapshot = testHarness.snapshot(1, 0); + + testHarness.close(); + checkFs(outDir, 0, 1, 2, 0); + + testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + checkFs(outDir, 0, 0, 3, 1); + + snapshot = testHarness.snapshot(2, 0); + + testHarness.processElement(new StreamRecord<>("test4", 10)); + checkFs(outDir, 1, 0, 3, 1); + + testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + + // the in-progress file remains as we do not clean up now + checkFs(outDir, 1, 0, 3, 1); + + testHarness.close(); + + // at close it is not moved to final because it is not part + // of the current task's state, it was just a not cleaned up leftover. + checkFs(outDir, 1, 0, 3, 1); + } + + @Test + public void testScalingDown() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 3, 0); + testHarness1.setup(); + testHarness1.open(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 3, 1); + testHarness2.setup(); + testHarness2.open(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2); + testHarness3.setup(); + testHarness3.open(); + + testHarness1.processElement(new StreamRecord<>("test1", 0L)); + checkFs(outDir, 1, 0, 0, 0); + + testHarness2.processElement(new StreamRecord<>("test2", 0L)); + testHarness2.processElement(new StreamRecord<>("test3", 0L)); + testHarness2.processElement(new StreamRecord<>("test4", 0L)); + testHarness2.processElement(new StreamRecord<>("test5", 0L)); + testHarness2.processElement(new StreamRecord<>("test6", 0L)); + checkFs(outDir, 2, 4, 0, 0); + + testHarness3.processElement(new StreamRecord<>("test7", 0L)); + testHarness3.processElement(new StreamRecord<>("test8", 0L)); + checkFs(outDir, 3, 5, 0, 0); + + // intentionally we snapshot them in a not ascending order so that the states are shuffled + OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( + testHarness3.snapshot(0, 0), + testHarness1.snapshot(0, 0), + testHarness2.snapshot(0, 0) + ); + + // with the above state reshuffling, we expect testHarness4 to take the + // state of the previous testHarness3 and testHarness1 while testHarness5 + // will take that of the previous testHarness1 + + OneInputStreamOperatorTestHarness<String, Object> testHarness4 = createRescalingTestSink(outDir, 2, 0); + testHarness4.setup(); + testHarness4.initializeState(mergedSnapshot); + testHarness4.open(); + + // we do not have a length file for part-2-0 because bucket part-2-0 + // was not "in-progress", but "pending" (its full content is valid). + checkFs(outDir, 1, 4, 3, 2); + + OneInputStreamOperatorTestHarness<String, Object> testHarness5 = createRescalingTestSink(outDir, 2, 1); + testHarness5.setup(); + testHarness5.initializeState(mergedSnapshot); + testHarness5.open(); + + checkFs(outDir, 0, 0, 8, 3); + } + + @Test + public void testScalingUp() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 2, 0); + testHarness1.setup(); + testHarness1.open(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 2, 0); + testHarness2.setup(); + testHarness2.open(); + + testHarness1.processElement(new StreamRecord<>("test1", 0L)); + testHarness1.processElement(new StreamRecord<>("test2", 0L)); + + checkFs(outDir, 1, 1, 0, 0); + + testHarness2.processElement(new StreamRecord<>("test3", 0L)); + testHarness2.processElement(new StreamRecord<>("test4", 0L)); + testHarness2.processElement(new StreamRecord<>("test5", 0L)); + + checkFs(outDir, 2, 3, 0, 0); + + // intentionally we snapshot them in the reverse order so that the states are shuffled + OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( + testHarness2.snapshot(0, 0), + testHarness1.snapshot(0, 0) + ); + + testHarness1 = createRescalingTestSink(outDir, 3, 0); + testHarness1.setup(); + testHarness1.initializeState(mergedSnapshot); + testHarness1.open(); + + checkFs(outDir, 1, 1, 3, 1); + + testHarness2 = createRescalingTestSink(outDir, 3, 1); + testHarness2.setup(); + testHarness2.initializeState(mergedSnapshot); + testHarness2.open(); + + checkFs(outDir, 0, 0, 5, 2); + + OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2); + testHarness3.setup(); + testHarness3.initializeState(mergedSnapshot); + testHarness3.open(); + + checkFs(outDir, 0, 0, 5, 2); + + testHarness1.processElement(new StreamRecord<>("test6", 0)); + testHarness2.processElement(new StreamRecord<>("test6", 0)); + testHarness3.processElement(new StreamRecord<>("test6", 0)); + + // 3 for the different tasks + checkFs(outDir, 3, 0, 5, 2); + + testHarness1.snapshot(1, 0); + testHarness2.snapshot(1, 0); + testHarness3.snapshot(1, 0); + + testHarness1.close(); + testHarness2.close(); + testHarness3.close(); + + checkFs(outDir, 0, 3, 5, 2); + } + + private OneInputStreamOperatorTestHarness<String, Object> createRescalingTestSink( + File outDir, int totalParallelism, int taskIdx) throws Exception { + + RollingSink<String> sink = new RollingSink<String>(outDir.getAbsolutePath()) + .setWriter(new StringWriter<String>()) + .setBatchSize(5) + .setPartPrefix(PART_PREFIX) + .setInProgressPrefix("") + .setPendingPrefix("") + .setValidLengthPrefix("") + .setInProgressSuffix(IN_PROGRESS_SUFFIX) + .setPendingSuffix(PENDING_SUFFIX) + .setValidLengthSuffix(VALID_LENGTH_SUFFIX); + + return createTestSink(sink, totalParallelism, taskIdx); + } + + private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink( + RollingSink<T> sink, int totalParallelism, int taskIdx) throws Exception { + return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), 10, totalParallelism, taskIdx); + } + + private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException { + int inProg = 0; + int pend = 0; + int compl = 0; + int val = 0; + + for (File file: FileUtils.listFiles(outDir, null, true)) { + if (file.getAbsolutePath().endsWith("crc")) { + continue; + } + String path = file.getPath(); + if (path.endsWith(IN_PROGRESS_SUFFIX)) { + inProg++; + } else if (path.endsWith(PENDING_SUFFIX)) { + pend++; + } else if (path.endsWith(VALID_LENGTH_SUFFIX)) { + val++; + } else if (path.contains(PART_PREFIX)) { + compl++; + } + } + + Assert.assertEquals(inprogress, inProg); + Assert.assertEquals(pending, pend); + Assert.assertEquals(completed, compl); + Assert.assertEquals(valid, val); + } + + private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> { + private static final long serialVersionUID = 1L; + + private volatile boolean running = true; + + private final int numElements; + + public TestSourceFunction(int numElements) { + this.numElements = numElements; + } + + @Override + public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception { + for (int i = 0; i < numElements && running; i++) { + ctx.collect(Tuple2.of(i, "message #" + i)); + } + } + + @Override + public void cancel() { + running = false; + } + } + + /** + * This waits on the two multi-shot latches. The latches are triggered in a parallel + * flatMap inside the test topology. + */ + private static class WaitingTestSourceFunction implements SourceFunction<Tuple2<Integer, String>> { + private static final long serialVersionUID = 1L; + + private volatile boolean running = true; + + private final int numElements; + + public WaitingTestSourceFunction(int numElements) { + this.numElements = numElements; + } + + @Override + public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception { + for (int i = 0; i < numElements && running; i++) { + if (i % 5 == 0 && i > 0) { + // update the clock after "five seconds", so we get 20 seconds in total + // with 5 elements in each time window + latch1.await(); + latch2.await(); + ModifyableClock.setCurrentTime(i * 1000); + } + ctx.collect(Tuple2.of(i, "message #" + i)); + } + } + + @Override + public void cancel() { + running = false; + } + } + + + private static class StreamWriterWithConfigCheck<T> extends StringWriter<T> { + private String key; + private String expect; + public StreamWriterWithConfigCheck(String key, String expect) { + this.key = key; + this.expect = expect; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + super.open(fs, path); + Assert.assertEquals(expect, fs.getConf().get(key)); + } + + @Override + public Writer<T> duplicate() { + return new StreamWriterWithConfigCheck<>(key, expect); + } + } + + public static class OddEvenFilter extends RichFilterFunction<Tuple2<Integer, String>> { + private static final long serialVersionUID = 1L; + + @Override + public boolean filter(Tuple2<Integer, String> value) throws Exception { + if (getRuntimeContext().getIndexOfThisSubtask() == 0) { + return value.f0 % 2 == 0; + } else { + return value.f0 % 2 == 1; + } + } + } + + public static class ModifyableClock implements Clock { + + private static volatile long currentTime = 0; + + public static void setCurrentTime(long currentTime) { + ModifyableClock.currentTime = currentTime; + } + + @Override + public long currentTimeMillis() { + return currentTime; + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java new file mode 100644 index 0000000..eb12d07 --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.fs; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.test.util.SecureTestEnvironment; +import org.apache.flink.test.util.TestingSecurityContext; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.NetUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.VersionInfo; +import org.junit.AfterClass; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY; + +/** + * Tests for running {@link RollingSinkSecuredITCase} which is an extension of {@link RollingSink} in secure environment + * Note: only executed for Hadoop version > 3.x.x + */ +public class RollingSinkSecuredITCase extends RollingSinkITCase { + + protected static final Logger LOG = LoggerFactory.getLogger(RollingSinkSecuredITCase.class); + + /** + * Skips all tests if the Hadoop version doesn't match. + * We can't run this test class until HDFS-9213 is fixed which allows a secure DataNode + * to bind to non-privileged ports for testing. + * For now, we skip this test class until Hadoop version 3.x.x. + */ + private static void skipIfHadoopVersionIsNotAppropriate() { + // Skips all tests if the Hadoop version doesn't match + String hadoopVersionString = VersionInfo.getVersion(); + String[] split = hadoopVersionString.split("\\."); + if (split.length != 3) { + throw new IllegalStateException("Hadoop version was not of format 'X.X.X': " + hadoopVersionString); + } + Assume.assumeTrue( + // check whether we're running Hadoop version >= 3.x.x + Integer.parseInt(split[0]) >= 3 + ); + } + + /* + * override super class static methods to avoid creating MiniDFS and MiniFlink with wrong configurations + * and out-of-order sequence for secure cluster + */ + @BeforeClass + public static void setup() throws Exception {} + + @AfterClass + public static void teardown() throws Exception {} + + @BeforeClass + public static void createHDFS() throws IOException {} + + @AfterClass + public static void destroyHDFS() {} + + @BeforeClass + public static void startSecureCluster() throws Exception { + + skipIfHadoopVersionIsNotAppropriate(); + + LOG.info("starting secure cluster environment for testing"); + + dataDir = tempFolder.newFolder(); + + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath()); + + SecureTestEnvironment.prepare(tempFolder); + + populateSecureConfigurations(); + + Configuration flinkConfig = new Configuration(); + flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, + SecureTestEnvironment.getTestKeytab()); + flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, + SecureTestEnvironment.getHadoopServicePrincipal()); + + SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig); + ctx.setHadoopConfiguration(conf); + try { + TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap()); + } catch (Exception e) { + throw new RuntimeException("Exception occurred while setting up secure test context. Reason: {}", e); + } + + File hdfsSiteXML = new File(dataDir.getAbsolutePath() + "/hdfs-site.xml"); + + FileWriter writer = new FileWriter(hdfsSiteXML); + conf.writeXml(writer); + writer.flush(); + writer.close(); + + Map<String, String> map = new HashMap<String, String>(System.getenv()); + map.put("HADOOP_CONF_DIR", hdfsSiteXML.getParentFile().getAbsolutePath()); + TestBaseUtils.setEnv(map); + + + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); + builder.checkDataNodeAddrConfig(true); + builder.checkDataNodeHostConfig(true); + hdfsCluster = builder.build(); + + dfs = hdfsCluster.getFileSystem(); + + hdfsURI = "hdfs://" + + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort()) + + "/"; + + startSecureFlinkClusterWithRecoveryModeEnabled(); + } + + @AfterClass + public static void teardownSecureCluster() throws Exception { + LOG.info("tearing down secure cluster environment"); + + TestStreamEnvironment.unsetAsContext(); + stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT); + + if (hdfsCluster != null) { + hdfsCluster.shutdown(); + } + + SecureTestEnvironment.cleanup(); + } + + private static void populateSecureConfigurations() { + + String dataTransferProtection = "authentication"; + + SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, conf); + conf.set(DFS_NAMENODE_USER_NAME_KEY, SecureTestEnvironment.getHadoopServicePrincipal()); + conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, SecureTestEnvironment.getTestKeytab()); + conf.set(DFS_DATANODE_USER_NAME_KEY, SecureTestEnvironment.getHadoopServicePrincipal()); + conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, SecureTestEnvironment.getTestKeytab()); + conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, SecureTestEnvironment.getHadoopServicePrincipal()); + + conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + + conf.set("dfs.data.transfer.protection", dataTransferProtection); + + conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTP_ONLY.name()); + + conf.set(DFS_ENCRYPT_DATA_TRANSFER_KEY, "false"); + + conf.setInt("dfs.datanode.socket.write.timeout", 0); + + /* + * We ae setting the port number to privileged port - see HDFS-9213 + * This requires the user to have root privilege to bind to the port + * Use below command (ubuntu) to set privilege to java process for the + * bind() to work if the java process is not running as root. + * setcap 'cap_net_bind_service=+ep' /path/to/java + */ + conf.set(DFS_DATANODE_ADDRESS_KEY, "localhost:1002"); + conf.set(DFS_DATANODE_HOST_NAME_KEY, "localhost"); + conf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "localhost:1003"); + } + + private static void startSecureFlinkClusterWithRecoveryModeEnabled() { + try { + LOG.info("Starting Flink and ZK in secure mode"); + + dfs.mkdirs(new Path("/flink/checkpoints")); + dfs.mkdirs(new Path("/flink/recovery")); + + org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration(); + + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM); + config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false); + config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3); + config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + config.setString(ConfigConstants.STATE_BACKEND, "filesystem"); + config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints"); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery"); + config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints"); + + SecureTestEnvironment.populateFlinkSecureConfigurations(config); + + cluster = TestBaseUtils.startCluster(config, false); + TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM); + + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /* For secure cluster testing, it is enough to run only one test and override below test methods + * to keep the overall build time minimal + */ + @Override + public void testNonRollingSequenceFileWithoutCompressionWriter() throws Exception {} + + @Override + public void testNonRollingSequenceFileWithCompressionWriter() throws Exception {} + + @Override + public void testNonRollingAvroKeyValueWithoutCompressionWriter() throws Exception {} + + @Override + public void testNonRollingAvroKeyValueWithCompressionWriter() throws Exception {} + + @Override + public void testDateTimeRollingStringWriter() throws Exception {} + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java new file mode 100644 index 0000000..54703a3 --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.fs.bucketing; + +import com.google.common.collect.Sets; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase; +import org.apache.flink.util.NetUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.rules.TemporaryFolder; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.BufferedReader; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link BucketingSink}. + * + * <p> + * This test only verifies the exactly once behaviour of the sink. Another test tests the + * rolling behaviour. + */ +public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestBase { + + final long NUM_STRINGS = 16_000; + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + private static MiniDFSCluster hdfsCluster; + private static org.apache.hadoop.fs.FileSystem dfs; + + private static String outPath; + + private static final String PENDING_SUFFIX = ".pending"; + private static final String IN_PROGRESS_SUFFIX = ".in-progress"; + + @BeforeClass + public static void createHDFS() throws IOException { + Configuration conf = new Configuration(); + + File dataDir = tempFolder.newFolder(); + + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath()); + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); + hdfsCluster = builder.build(); + + dfs = hdfsCluster.getFileSystem(); + + outPath = "hdfs://" + + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort()) + + "/string-non-rolling-out"; + } + + @AfterClass + public static void destroyHDFS() { + if (hdfsCluster != null) { + hdfsCluster.shutdown(); + } + } + + @Override + public void testProgram(StreamExecutionEnvironment env) { + assertTrue("Broken test setup", NUM_STRINGS % 40 == 0); + + int PARALLELISM = 12; + + env.enableCheckpointing(20); + env.setParallelism(PARALLELISM); + env.disableOperatorChaining(); + + DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain(); + + DataStream<String> mapped = stream + .map(new OnceFailingIdentityMapper(NUM_STRINGS)); + + BucketingSink<String> sink = new BucketingSink<String>(outPath) + .setBucketer(new BasePathBucketer<String>()) + .setBatchSize(10000) + .setValidLengthPrefix("") + .setPendingPrefix("") + .setPendingSuffix(PENDING_SUFFIX) + .setInProgressSuffix(IN_PROGRESS_SUFFIX); + + mapped.addSink(sink); + + } + + @Override + public void postSubmit() throws Exception { + // We read the files and verify that we have read all the strings. If a valid-length + // file exists we only read the file to that point. (This test should work with + // FileSystems that support truncate() and with others as well.) + + Pattern messageRegex = Pattern.compile("message (\\d*)"); + + // Keep a set of the message IDs that we read. The size must equal the read count and + // the NUM_STRINGS. If numRead is bigger than the size of the set we have seen some + // elements twice. + Set<Integer> readNumbers = Sets.newHashSet(); + + HashSet<String> uniqMessagesRead = new HashSet<>(); + HashSet<String> messagesInCommittedFiles = new HashSet<>(); + + RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path( + outPath), true); + + while (files.hasNext()) { + LocatedFileStatus file = files.next(); + + if (!file.getPath().toString().endsWith(".valid-length")) { + int validLength = (int) file.getLen(); + if (dfs.exists(file.getPath().suffix(".valid-length"))) { + FSDataInputStream inStream = dfs.open(file.getPath().suffix(".valid-length")); + String validLengthString = inStream.readUTF(); + validLength = Integer.parseInt(validLengthString); + System.out.println("VALID LENGTH: " + validLength); + } + FSDataInputStream inStream = dfs.open(file.getPath()); + byte[] buffer = new byte[validLength]; + inStream.readFully(0, buffer, 0, validLength); + inStream.close(); + + ByteArrayInputStream bais = new ByteArrayInputStream(buffer); + + InputStreamReader inStreamReader = new InputStreamReader(bais); + BufferedReader br = new BufferedReader(inStreamReader); + + String line = br.readLine(); + while (line != null) { + Matcher matcher = messageRegex.matcher(line); + if (matcher.matches()) { + uniqMessagesRead.add(line); + + // check that in the committed files there are no duplicates + if (!file.getPath().toString().endsWith(IN_PROGRESS_SUFFIX) && !file.getPath().toString().endsWith(PENDING_SUFFIX)) { + if (!messagesInCommittedFiles.add(line)) { + Assert.fail("Duplicate entry in committed bucket."); + } + } + + int messageId = Integer.parseInt(matcher.group(1)); + readNumbers.add(messageId); + } else { + Assert.fail("Read line does not match expected pattern."); + } + line = br.readLine(); + } + br.close(); + inStreamReader.close(); + bais.close(); + } + } + + // Verify that we read all strings (at-least-once) + Assert.assertEquals(NUM_STRINGS, readNumbers.size()); + + // Verify that we don't have duplicates (boom!, exactly-once) + Assert.assertEquals(NUM_STRINGS, uniqMessagesRead.size()); + } + + private static class OnceFailingIdentityMapper extends RichMapFunction<String, String> { + private static final long serialVersionUID = 1L; + + private static volatile boolean hasFailed = false; + + private final long numElements; + + private long failurePos; + private long count; + + + OnceFailingIdentityMapper(long numElements) { + this.numElements = numElements; + } + + @Override + public void open(org.apache.flink.configuration.Configuration parameters) throws IOException { + long failurePosMin = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks()); + long failurePosMax = (long) (0.9 * numElements / getRuntimeContext().getNumberOfParallelSubtasks()); + + failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; + count = 0; + } + + @Override + public String map(String value) throws Exception { + count++; + if (!hasFailed && count >= failurePos) { + hasFailed = true; + throw new Exception("Test Failure"); + } + + return value; + } + } + + private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> + implements CheckpointedAsynchronously<Integer> { + + private static final long serialVersionUID = 1L; + + private final long numElements; + + private int index; + + private volatile boolean isRunning = true; + + + StringGeneratingSourceFunction(long numElements) { + this.numElements = numElements; + } + + @Override + public void run(SourceContext<String> ctx) throws Exception { + final Object lockingObject = ctx.getCheckpointLock(); + + final int step = getRuntimeContext().getNumberOfParallelSubtasks(); + + if (index == 0) { + index = getRuntimeContext().getIndexOfThisSubtask(); + } + + while (isRunning && index < numElements) { + + Thread.sleep(1); + synchronized (lockingObject) { + ctx.collect("message " + index); + index += step; + } + } + } + + @Override + public void cancel() { + isRunning = false; + } + + private static String randomString(StringBuilder bld, Random rnd) { + final int len = rnd.nextInt(10) + 5; + + for (int i = 0; i < len; i++) { + char next = (char) (rnd.nextInt(20000) + 33); + bld.append(next); + } + + return bld.toString(); + } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) { + return index; + } + + @Override + public void restoreState(Integer state) { + index = state; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java new file mode 100644 index 0000000..d671874 --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java @@ -0,0 +1,867 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.fs.bucketing; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileConstants; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter; +import org.apache.flink.streaming.connectors.fs.Clock; +import org.apache.flink.streaming.connectors.fs.SequenceFileWriter; +import org.apache.flink.streaming.connectors.fs.StringWriter; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.NetUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.Assert; +import org.junit.rules.TemporaryFolder; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; + +public class BucketingSinkTest { + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + private static MiniDFSCluster hdfsCluster; + private static org.apache.hadoop.fs.FileSystem dfs; + private static String hdfsURI; + + private static final String PART_PREFIX = "part"; + private static final String PENDING_SUFFIX = ".pending"; + private static final String IN_PROGRESS_SUFFIX = ".in-progress"; + private static final String VALID_LENGTH_SUFFIX = ".valid"; + + private OneInputStreamOperatorTestHarness<String, Object> createRescalingTestSink( + File outDir, int totalParallelism, int taskIdx, long inactivityInterval) throws Exception { + + BucketingSink<String> sink = new BucketingSink<String>(outDir.getAbsolutePath()) + .setBucketer(new Bucketer<String>() { + private static final long serialVersionUID = 1L; + + @Override + public Path getBucketPath(Clock clock, Path basePath, String element) { + return new Path(basePath, element); + } + }) + .setWriter(new StringWriter<String>()) + .setInactiveBucketCheckInterval(inactivityInterval) + .setInactiveBucketThreshold(inactivityInterval) + .setPartPrefix(PART_PREFIX) + .setInProgressPrefix("") + .setPendingPrefix("") + .setValidLengthPrefix("") + .setInProgressSuffix(IN_PROGRESS_SUFFIX) + .setPendingSuffix(PENDING_SUFFIX) + .setValidLengthSuffix(VALID_LENGTH_SUFFIX); + + return createTestSink(sink, totalParallelism, taskIdx); + } + + private OneInputStreamOperatorTestHarness<String, Object> createTestSink(File dataDir, int totalParallelism, int taskIdx) throws Exception { + BucketingSink<String> sink = new BucketingSink<String>(dataDir.getAbsolutePath()) + .setBucketer(new Bucketer<String>() { + private static final long serialVersionUID = 1L; + + @Override + public Path getBucketPath(Clock clock, Path basePath, String element) { + return new Path(basePath, element); + } + }) + .setWriter(new StringWriter<String>()) + .setPartPrefix(PART_PREFIX) + .setPendingPrefix("") + .setInactiveBucketCheckInterval(5*60*1000L) + .setInactiveBucketThreshold(5*60*1000L) + .setPendingSuffix(PENDING_SUFFIX) + .setInProgressSuffix(IN_PROGRESS_SUFFIX); + + return createTestSink(sink, totalParallelism, taskIdx); + } + + private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink( + BucketingSink<T> sink, int totalParallelism, int taskIdx) throws Exception { + return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), 10, totalParallelism, taskIdx); + } + + @BeforeClass + public static void createHDFS() throws IOException { + Configuration conf = new Configuration(); + + File dataDir = tempFolder.newFolder(); + + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath()); + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); + hdfsCluster = builder.build(); + + dfs = hdfsCluster.getFileSystem(); + + hdfsURI = "hdfs://" + + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort()) + + "/"; + } + + @AfterClass + public static void destroyHDFS() { + hdfsCluster.shutdown(); + } + + @Test + public void testInactivityPeriodWithLateNotify() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = createRescalingTestSink(outDir, 1, 0, 100); + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(0L); + + testHarness.processElement(new StreamRecord<>("test1", 1L)); + testHarness.processElement(new StreamRecord<>("test2", 1L)); + checkFs(outDir, 2, 0 ,0, 0); + + testHarness.setProcessingTime(101L); // put some in pending + checkFs(outDir, 0, 2, 0, 0); + + testHarness.snapshot(0, 0); // put them in pending for 0 + checkFs(outDir, 0, 2, 0, 0); + + testHarness.processElement(new StreamRecord<>("test3", 1L)); + testHarness.processElement(new StreamRecord<>("test4", 1L)); + + testHarness.setProcessingTime(202L); // put some in pending + + testHarness.snapshot(1, 0); // put them in pending for 1 + checkFs(outDir, 0, 4, 0, 0); + + testHarness.notifyOfCompletedCheckpoint(0); // put the pending for 0 to the "committed" state + checkFs(outDir, 0, 2, 2, 0); + + testHarness.notifyOfCompletedCheckpoint(1); // put the pending for 1 to the "committed" state + checkFs(outDir, 0, 0, 4, 0); + } + + @Test + public void testBucketStateTransitions() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = createRescalingTestSink(outDir, 1, 0, 100); + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(0L); + + testHarness.processElement(new StreamRecord<>("test1", 1L)); + testHarness.processElement(new StreamRecord<>("test2", 1L)); + checkFs(outDir, 2, 0 ,0, 0); + + // this is to check the inactivity threshold + testHarness.setProcessingTime(101L); + checkFs(outDir, 0, 2, 0, 0); + + testHarness.processElement(new StreamRecord<>("test3", 1L)); + checkFs(outDir, 1, 2, 0, 0); + + testHarness.snapshot(0, 0); + checkFs(outDir, 1, 2, 0, 0); + + testHarness.notifyOfCompletedCheckpoint(0); + checkFs(outDir, 1, 0, 2, 0); + + OperatorStateHandles snapshot = testHarness.snapshot(1, 0); + + testHarness.close(); + checkFs(outDir, 0, 1, 2, 0); + + testHarness = createRescalingTestSink(outDir, 1, 0, 100); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + checkFs(outDir, 0, 0, 3, 1); + + snapshot = testHarness.snapshot(2, 0); + + testHarness.processElement(new StreamRecord<>("test4", 10)); + checkFs(outDir, 1, 0, 3, 1); + + testHarness = createRescalingTestSink(outDir, 1, 0, 100); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + + // the in-progress file remains as we do not clean up now + checkFs(outDir, 1, 0, 3, 1); + + testHarness.close(); + + // at close it is not moved to final because it is not part + // of the current task's state, it was just a not cleaned up leftover. + checkFs(outDir, 1, 0, 3, 1); + } + + @Test + public void testSameParallelismWithShufflingStates() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 2, 0, 100); + testHarness1.setup(); + testHarness1.open(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 2, 1, 100); + testHarness2.setup(); + testHarness2.open(); + + testHarness1.processElement(new StreamRecord<>("test1", 0L)); + checkFs(outDir, 1, 0, 0, 0); + + testHarness2.processElement(new StreamRecord<>("test2", 0L)); + checkFs(outDir, 2, 0, 0, 0); + + // intentionally we snapshot them in the reverse order so that the states are shuffled + OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( + testHarness2.snapshot(0, 0), + testHarness1.snapshot(0, 0) + ); + + checkFs(outDir, 2, 0, 0, 0); + + // this will not be included in any checkpoint so it can be cleaned up (although we do not) + testHarness2.processElement(new StreamRecord<>("test3", 0L)); + checkFs(outDir, 3, 0, 0, 0); + + testHarness1 = createRescalingTestSink(outDir, 2, 0, 100); + testHarness1.setup(); + testHarness1.initializeState(mergedSnapshot); + testHarness1.open(); + + // the one in-progress will be the one assigned to the next instance, + // the other is the test3 which is just not cleaned up + checkFs(outDir, 2, 0, 1, 1); + + testHarness2 = createRescalingTestSink(outDir, 2, 1, 100); + testHarness2.setup(); + testHarness2.initializeState(mergedSnapshot); + testHarness2.open(); + + checkFs(outDir, 1, 0, 2, 2); + + testHarness1.close(); + testHarness2.close(); + + // the 1 in-progress can be discarded. + checkFs(outDir, 1, 0, 2, 2); + } + + @Test + public void testScalingDown() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 3, 0, 100); + testHarness1.setup(); + testHarness1.open(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 3, 1, 100); + testHarness2.setup(); + testHarness2.open(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2, 100); + testHarness3.setup(); + testHarness3.open(); + + testHarness1.processElement(new StreamRecord<>("test1", 0L)); + checkFs(outDir, 1, 0, 0, 0); + + testHarness2.processElement(new StreamRecord<>("test2", 0L)); + checkFs(outDir, 2, 0, 0, 0); + + testHarness3.processElement(new StreamRecord<>("test3", 0L)); + testHarness3.processElement(new StreamRecord<>("test4", 0L)); + checkFs(outDir, 4, 0, 0, 0); + + // intentionally we snapshot them in the reverse order so that the states are shuffled + OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( + testHarness3.snapshot(0, 0), + testHarness1.snapshot(0, 0), + testHarness2.snapshot(0, 0) + ); + + testHarness1 = createRescalingTestSink(outDir, 2, 0, 100); + testHarness1.setup(); + testHarness1.initializeState(mergedSnapshot); + testHarness1.open(); + + checkFs(outDir, 1, 0, 3, 3); + + testHarness2 = createRescalingTestSink(outDir, 2, 1, 100); + testHarness2.setup(); + testHarness2.initializeState(mergedSnapshot); + testHarness2.open(); + + checkFs(outDir, 0, 0, 4, 4); + } + + @Test + public void testScalingUp() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 2, 0, 100); + testHarness1.setup(); + testHarness1.open(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 2, 0, 100); + testHarness2.setup(); + testHarness2.open(); + + testHarness1.processElement(new StreamRecord<>("test1", 1L)); + testHarness1.processElement(new StreamRecord<>("test2", 1L)); + + checkFs(outDir, 2, 0, 0, 0); + + testHarness2.processElement(new StreamRecord<>("test3", 1L)); + testHarness2.processElement(new StreamRecord<>("test4", 1L)); + testHarness2.processElement(new StreamRecord<>("test5", 1L)); + + checkFs(outDir, 5, 0, 0, 0); + + // intentionally we snapshot them in the reverse order so that the states are shuffled + OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( + testHarness2.snapshot(0, 0), + testHarness1.snapshot(0, 0) + ); + + testHarness1 = createRescalingTestSink(outDir, 3, 0, 100); + testHarness1.setup(); + testHarness1.initializeState(mergedSnapshot); + testHarness1.open(); + + checkFs(outDir, 2, 0, 3, 3); + + testHarness2 = createRescalingTestSink(outDir, 3, 1, 100); + testHarness2.setup(); + testHarness2.initializeState(mergedSnapshot); + testHarness2.open(); + + checkFs(outDir, 0, 0, 5, 5); + + OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2, 100); + testHarness3.setup(); + testHarness3.initializeState(mergedSnapshot); + testHarness3.open(); + + checkFs(outDir, 0, 0, 5, 5); + + testHarness1.processElement(new StreamRecord<>("test6", 0)); + testHarness2.processElement(new StreamRecord<>("test6", 0)); + testHarness3.processElement(new StreamRecord<>("test6", 0)); + + checkFs(outDir, 3, 0, 5, 5); + + testHarness1.snapshot(1, 0); + testHarness2.snapshot(1, 0); + testHarness3.snapshot(1, 0); + + testHarness1.close(); + testHarness2.close(); + testHarness3.close(); + + checkFs(outDir, 0, 3, 5, 5); + } + + private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException { + int inProg = 0; + int pend = 0; + int compl = 0; + int val = 0; + + for (File file: FileUtils.listFiles(outDir, null, true)) { + if (file.getAbsolutePath().endsWith("crc")) { + continue; + } + String path = file.getPath(); + if (path.endsWith(IN_PROGRESS_SUFFIX)) { + inProg++; + } else if (path.endsWith(PENDING_SUFFIX)) { + pend++; + } else if (path.endsWith(VALID_LENGTH_SUFFIX)) { + val++; + } else if (path.contains(PART_PREFIX)) { + compl++; + } + } + + Assert.assertEquals(inprogress, inProg); + Assert.assertEquals(pending, pend); + Assert.assertEquals(completed, compl); + Assert.assertEquals(valid, val); + } + + /** + * This tests {@link StringWriter} with + * non-bucketing output. + */ + @Test + public void testNonRollingStringWriter() throws Exception { + final String outPath = hdfsURI + "/string-non-rolling-out"; + + final int numElements = 20; + + BucketingSink<String> sink = new BucketingSink<String>(outPath) + .setBucketer(new BasePathBucketer<String>()) + .setPartPrefix(PART_PREFIX) + .setPendingPrefix("") + .setPendingSuffix(""); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink, 1, 0); + + testHarness.setProcessingTime(0L); + + testHarness.setup(); + testHarness.open(); + + for (int i = 0; i < numElements; i++) { + testHarness.processElement(new StreamRecord<>("message #" + Integer.toString(i))); + } + + testHarness.close(); + + FSDataInputStream inStream = dfs.open(new Path(outPath + "/" + PART_PREFIX + "-0-0")); + + BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); + + for (int i = 0; i < numElements; i++) { + String line = br.readLine(); + Assert.assertEquals("message #" + i, line); + } + + inStream.close(); + } + + /** + * This tests {@link SequenceFileWriter} + * with non-rolling output and without compression. + */ + @Test + public void testNonRollingSequenceFileWithoutCompressionWriter() throws Exception { + final String outPath = hdfsURI + "/seq-no-comp-non-rolling-out"; + + final int numElements = 20; + + BucketingSink<Tuple2<IntWritable, Text>> sink = new BucketingSink<Tuple2<IntWritable, Text>>(outPath) + .setWriter(new SequenceFileWriter<IntWritable, Text>()) + .setBucketer(new BasePathBucketer<Tuple2<IntWritable, Text>>()) + .setPartPrefix(PART_PREFIX) + .setPendingPrefix("") + .setPendingSuffix(""); + + sink.setInputType(TypeInformation.of(new TypeHint<Tuple2<IntWritable, Text>>(){}), new ExecutionConfig()); + + OneInputStreamOperatorTestHarness<Tuple2<IntWritable, Text>, Object> testHarness = + createTestSink(sink, 1, 0); + + testHarness.setProcessingTime(0L); + + testHarness.setup(); + testHarness.open(); + + for (int i = 0; i < numElements; i++) { + testHarness.processElement(new StreamRecord<>(Tuple2.of( + new IntWritable(i), + new Text("message #" + Integer.toString(i)) + ))); + } + + testHarness.close(); + + FSDataInputStream inStream = dfs.open(new Path(outPath + "/" + PART_PREFIX + "-0-0")); + + SequenceFile.Reader reader = new SequenceFile.Reader(inStream, 1000, 0, 100000, new Configuration()); + + IntWritable intWritable = new IntWritable(); + Text txt = new Text(); + + for (int i = 0; i < numElements; i++) { + reader.next(intWritable, txt); + Assert.assertEquals(i, intWritable.get()); + Assert.assertEquals("message #" + i, txt.toString()); + } + + reader.close(); + inStream.close(); + } + + /** + * This tests {@link AvroKeyValueSinkWriter} + * with non-rolling output and with compression. + */ + @Test + public void testNonRollingAvroKeyValueWithCompressionWriter() throws Exception { + final String outPath = hdfsURI + "/avro-kv-no-comp-non-rolling-out"; + + final int numElements = 20; + + Map<String, String> properties = new HashMap<>(); + Schema keySchema = Schema.create(Schema.Type.INT); + Schema valueSchema = Schema.create(Schema.Type.STRING); + properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString()); + properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString()); + properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, String.valueOf(true)); + properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC); + + BucketingSink<Tuple2<Integer, String>> sink = new BucketingSink<Tuple2<Integer, String>>(outPath) + .setWriter(new AvroKeyValueSinkWriter<Integer, String>(properties)) + .setBucketer(new BasePathBucketer<Tuple2<Integer, String>>()) + .setPartPrefix(PART_PREFIX) + .setPendingPrefix("") + .setPendingSuffix(""); + + OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, Object> testHarness = + createTestSink(sink, 1, 0); + + testHarness.setProcessingTime(0L); + + testHarness.setup(); + testHarness.open(); + + for (int i = 0; i < numElements; i++) { + testHarness.processElement(new StreamRecord<>(Tuple2.of( + i, "message #" + Integer.toString(i) + ))); + } + + testHarness.close(); + + GenericData.setStringType(valueSchema, GenericData.StringType.String); + Schema elementSchema = AvroKeyValueSinkWriter.AvroKeyValue.getSchema(keySchema, valueSchema); + + FSDataInputStream inStream = dfs.open(new Path(outPath + "/" + PART_PREFIX + "-0-0")); + + SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<>(elementSchema); + DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(inStream, elementReader); + for (int i = 0; i < numElements; i++) { + AvroKeyValueSinkWriter.AvroKeyValue<Integer, String> wrappedEntry = + new AvroKeyValueSinkWriter.AvroKeyValue<>(dataFileStream.next()); + int key = wrappedEntry.getKey(); + Assert.assertEquals(i, key); + String value = wrappedEntry.getValue(); + Assert.assertEquals("message #" + i, value); + } + + dataFileStream.close(); + inStream.close(); + } + + /** + * This uses {@link DateTimeBucketer} to + * produce rolling files. We use {@link OneInputStreamOperatorTestHarness} to manually + * advance processing time. + */ + @Test + public void testDateTimeRollingStringWriter() throws Exception { + final int numElements = 20; + + final String outPath = hdfsURI + "/rolling-out"; + + BucketingSink<String> sink = new BucketingSink<String>(outPath) + .setBucketer(new DateTimeBucketer<String>("ss")) + .setPartPrefix(PART_PREFIX) + .setPendingPrefix("") + .setPendingSuffix(""); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink, 1, 0); + + testHarness.setProcessingTime(0L); + + testHarness.setup(); + testHarness.open(); + + for (int i = 0; i < numElements; i++) { + // Every 5 elements, increase the clock time. We should end up with 5 elements per bucket. + if (i % 5 == 0) { + testHarness.setProcessingTime(i * 1000L); + } + testHarness.processElement(new StreamRecord<>("message #" + Integer.toString(i))); + } + + testHarness.close(); + + RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(outPath), true); + + // We should have 4 rolling files across 4 time intervals + int numFiles = 0; + while (files.hasNext()) { + LocatedFileStatus file = files.next(); + numFiles++; + if (file.getPath().toString().contains("rolling-out/00")) { + FSDataInputStream inStream = dfs.open(file.getPath()); + + BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); + + for (int i = 0; i < 5; i++) { + String line = br.readLine(); + Assert.assertEquals("message #" + i, line); + } + + inStream.close(); + } else if (file.getPath().toString().contains("rolling-out/05")) { + FSDataInputStream inStream = dfs.open(file.getPath()); + + BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); + + for (int i = 5; i < 10; i++) { + String line = br.readLine(); + Assert.assertEquals("message #" + i, line); + } + + inStream.close(); + } else if (file.getPath().toString().contains("rolling-out/10")) { + FSDataInputStream inStream = dfs.open(file.getPath()); + + BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); + + for (int i = 10; i < 15; i++) { + String line = br.readLine(); + Assert.assertEquals("message #" + i, line); + } + + inStream.close(); + } else if (file.getPath().toString().contains("rolling-out/15")) { + FSDataInputStream inStream = dfs.open(file.getPath()); + + BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); + + for (int i = 15; i < 20; i++) { + String line = br.readLine(); + Assert.assertEquals("message #" + i, line); + } + + inStream.close(); + } else { + Assert.fail("File " + file + " does not match any expected roll pattern."); + } + } + + Assert.assertEquals(4, numFiles); + } + + /** + * This uses a custom bucketing function which determines the bucket from the input. + */ + @Test + public void testCustomBucketing() throws Exception { + File dataDir = tempFolder.newFolder(); + + final int numIds = 4; + final int numElements = 20; + + OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir, 1, 0); + + testHarness.setProcessingTime(0L); + + testHarness.setup(); + testHarness.open(); + + for (int i = 0; i < numElements; i++) { + testHarness.processElement(new StreamRecord<>(Integer.toString(i % numIds))); + } + + testHarness.close(); + + // we should have 4 buckets, with 1 file each + int numFiles = 0; + for (File file: FileUtils.listFiles(dataDir, null, true)) { + if (file.getName().startsWith(PART_PREFIX)) { + numFiles++; + } + } + + Assert.assertEquals(4, numFiles); + } + + /** + * This uses a custom bucketing function which determines the bucket from the input. + * We use a simulated clock to reduce the number of buckets being written to over time. + * This causes buckets to become 'inactive' and their file parts 'closed' by the sink. + */ + @Test + public void testCustomBucketingInactiveBucketCleanup() throws Exception { + File dataDir = tempFolder.newFolder(); + + final int step1NumIds = 4; + final int step2NumIds = 2; + final int numElementsPerStep = 20; + + OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir, 1, 0); + + testHarness.setProcessingTime(0L); + + testHarness.setup(); + testHarness.open(); + + for (int i = 0; i < numElementsPerStep; i++) { + testHarness.processElement(new StreamRecord<>(Integer.toString(i % step1NumIds))); + } + + testHarness.setProcessingTime(2*60*1000L); + + for (int i = 0; i < numElementsPerStep; i++) { + testHarness.processElement(new StreamRecord<>(Integer.toString(i % step2NumIds))); + } + + testHarness.setProcessingTime(6*60*1000L); + + for (int i = 0; i < numElementsPerStep; i++) { + testHarness.processElement(new StreamRecord<>(Integer.toString(i % step2NumIds))); + } + + // we should have 4 buckets, with 1 file each + // 2 of these buckets should have been finalised due to becoming inactive + int numFiles = 0; + int numInProgress = 0; + for (File file: FileUtils.listFiles(dataDir, null, true)) { + if (file.getAbsolutePath().endsWith("crc")) { + continue; + } + if (file.getPath().endsWith(IN_PROGRESS_SUFFIX)) { + numInProgress++; + } + numFiles++; + } + + testHarness.close(); + + Assert.assertEquals(4, numFiles); + Assert.assertEquals(2, numInProgress); + } + + /** + * This tests user defined hdfs configuration + * @throws Exception + */ + @Test + public void testUserDefinedConfiguration() throws Exception { + final String outPath = hdfsURI + "/string-non-rolling-with-config"; + final int numElements = 20; + + Map<String, String> properties = new HashMap<>(); + Schema keySchema = Schema.create(Schema.Type.INT); + Schema valueSchema = Schema.create(Schema.Type.STRING); + properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString()); + properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString()); + properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, String.valueOf(true)); + properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC); + + Configuration conf = new Configuration(); + conf.set("io.file.buffer.size", "40960"); + + BucketingSink<Tuple2<Integer,String>> sink = new BucketingSink<Tuple2<Integer, String>>(outPath) + .setFSConfig(conf) + .setWriter(new StreamWriterWithConfigCheck<Integer, String>(properties, "io.file.buffer.size", "40960")) + .setBucketer(new BasePathBucketer<Tuple2<Integer,String>>()) + .setPartPrefix(PART_PREFIX) + .setPendingPrefix("") + .setPendingSuffix(""); + + OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, Object> testHarness = + createTestSink(sink, 1, 0); + + testHarness.setProcessingTime(0L); + + testHarness.setup(); + testHarness.open(); + + for (int i = 0; i < numElements; i++) { + testHarness.processElement(new StreamRecord<>(Tuple2.of( + i, "message #" + Integer.toString(i) + ))); + } + + testHarness.close(); + + GenericData.setStringType(valueSchema, GenericData.StringType.String); + Schema elementSchema = AvroKeyValueSinkWriter.AvroKeyValue.getSchema(keySchema, valueSchema); + + FSDataInputStream inStream = dfs.open(new Path(outPath + "/" + PART_PREFIX + "-0-0")); + + SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<>(elementSchema); + DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(inStream, elementReader); + for (int i = 0; i < numElements; i++) { + AvroKeyValueSinkWriter.AvroKeyValue<Integer, String> wrappedEntry = + new AvroKeyValueSinkWriter.AvroKeyValue<>(dataFileStream.next()); + int key = wrappedEntry.getKey(); + Assert.assertEquals(i, key); + String value = wrappedEntry.getValue(); + Assert.assertEquals("message #" + i, value); + } + + dataFileStream.close(); + inStream.close(); + } + + private static class StreamWriterWithConfigCheck<K, V> extends AvroKeyValueSinkWriter<K, V> { + private Map<String, String> properties; + private String key; + private String expect; + public StreamWriterWithConfigCheck(Map<String, String> properties, String key, String expect) { + super(properties); + this.properties = properties; + this.key = key; + this.expect = expect; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + super.open(fs, path); + Assert.assertEquals(expect, fs.getConf().get(key)); + } + + @Override + public Writer<Tuple2<K, V>> duplicate() { + return new StreamWriterWithConfigCheck<>(properties, key, expect); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..5c22851 --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties @@ -0,0 +1,29 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=OFF, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger + +log4j.logger.org.apache.directory=OFF, testlogger \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml b/flink-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml new file mode 100644 index 0000000..45b3b92 --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml @@ -0,0 +1,30 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> + <logger name="org.apache.flink.streaming" level="WARN"/> +</configuration> \ No newline at end of file
