http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java deleted file mode 100644 index 80ae294..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java +++ /dev/null @@ -1,991 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.fs; - -import org.apache.avro.Schema; -import org.apache.avro.Schema.Type; -import org.apache.avro.file.DataFileConstants; -import org.apache.avro.file.DataFileStream; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.GenericData.StringType; -import org.apache.avro.specific.SpecificDatumReader; -import org.apache.commons.io.FileUtils; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.RichFilterFunction; -import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.testutils.MultiShotLatch; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.AvroKeyValue; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; -import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; -import org.apache.flink.util.Collector; -import org.apache.flink.util.NetUtils; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; - -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.HashMap; -import java.util.Map; - -/** - * Tests for {@link RollingSink}. These - * tests test the different output methods as well as the rolling feature using a manual clock - * that increases time in lockstep with element computation using latches. - * - * <p> - * This only tests the rolling behaviour of the sink. There is a separate ITCase that verifies - * exactly once behaviour. - * - * @deprecated should be removed with the {@link RollingSink}. - */ -@Deprecated -public class RollingSinkITCase extends StreamingMultipleProgramsTestBase { - - protected static final Logger LOG = LoggerFactory.getLogger(RollingSinkITCase.class); - - @ClassRule - public static TemporaryFolder tempFolder = new TemporaryFolder(); - - protected static MiniDFSCluster hdfsCluster; - protected static org.apache.hadoop.fs.FileSystem dfs; - protected static String hdfsURI; - protected static Configuration conf = new Configuration(); - - protected static File dataDir; - - @BeforeClass - public static void createHDFS() throws IOException { - - LOG.info("In RollingSinkITCase: Starting MiniDFSCluster "); - - dataDir = tempFolder.newFolder(); - - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath()); - MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); - hdfsCluster = builder.build(); - - dfs = hdfsCluster.getFileSystem(); - - hdfsURI = "hdfs://" - + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort()) - + "/"; - } - - @AfterClass - public static void destroyHDFS() { - LOG.info("In RollingSinkITCase: tearing down MiniDFSCluster "); - hdfsCluster.shutdown(); - } - - /** - * This tests {@link StringWriter} with - * non-rolling output. - */ - @Test - public void testNonRollingStringWriter() throws Exception { - final int NUM_ELEMENTS = 20; - final int PARALLELISM = 2; - final String outPath = hdfsURI + "/string-non-rolling-out"; - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(PARALLELISM); - - DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS)) - .broadcast() - .filter(new OddEvenFilter()); - - RollingSink<String> sink = new RollingSink<String>(outPath) - .setBucketer(new NonRollingBucketer()) - .setPartPrefix("part") - .setPendingPrefix("") - .setPendingSuffix(""); - - source - .map(new MapFunction<Tuple2<Integer,String>, String>() { - private static final long serialVersionUID = 1L; - @Override - public String map(Tuple2<Integer, String> value) throws Exception { - return value.f1; - } - }) - .addSink(sink); - - env.execute("RollingSink String Write Test"); - - FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0")); - - BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); - - for (int i = 0; i < NUM_ELEMENTS; i += 2) { - String line = br.readLine(); - Assert.assertEquals("message #" + i, line); - } - - inStream.close(); - - inStream = dfs.open(new Path(outPath + "/part-1-0")); - - br = new BufferedReader(new InputStreamReader(inStream)); - - for (int i = 1; i < NUM_ELEMENTS; i += 2) { - String line = br.readLine(); - Assert.assertEquals("message #" + i, line); - } - - inStream.close(); - } - - /** - * This tests {@link SequenceFileWriter} - * with non-rolling output and without compression. - */ - @Test - public void testNonRollingSequenceFileWithoutCompressionWriter() throws Exception { - final int NUM_ELEMENTS = 20; - final int PARALLELISM = 2; - final String outPath = hdfsURI + "/seq-no-comp-non-rolling-out"; - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(PARALLELISM); - - DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS)) - .broadcast() - .filter(new OddEvenFilter()); - - DataStream<Tuple2<IntWritable, Text>> mapped = source.map(new MapFunction<Tuple2<Integer,String>, Tuple2<IntWritable, Text>>() { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<IntWritable, Text> map(Tuple2<Integer, String> value) throws Exception { - return Tuple2.of(new IntWritable(value.f0), new Text(value.f1)); - } - }); - - - RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath) - .setWriter(new SequenceFileWriter<IntWritable, Text>()) - .setBucketer(new NonRollingBucketer()) - .setPartPrefix("part") - .setPendingPrefix("") - .setPendingSuffix(""); - - mapped.addSink(sink); - - env.execute("RollingSink String Write Test"); - - FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0")); - - SequenceFile.Reader reader = new SequenceFile.Reader(inStream, - 1000, - 0, - 100000, - new Configuration()); - - IntWritable intWritable = new IntWritable(); - Text txt = new Text(); - - for (int i = 0; i < NUM_ELEMENTS; i += 2) { - reader.next(intWritable, txt); - Assert.assertEquals(i, intWritable.get()); - Assert.assertEquals("message #" + i, txt.toString()); - } - - reader.close(); - inStream.close(); - - inStream = dfs.open(new Path(outPath + "/part-1-0")); - - reader = new SequenceFile.Reader(inStream, - 1000, - 0, - 100000, - new Configuration()); - - for (int i = 1; i < NUM_ELEMENTS; i += 2) { - reader.next(intWritable, txt); - Assert.assertEquals(i, intWritable.get()); - Assert.assertEquals("message #" + i, txt.toString()); - } - - reader.close(); - inStream.close(); - } - - /** - * This tests {@link SequenceFileWriter} - * with non-rolling output but with compression. - */ - @Test - public void testNonRollingSequenceFileWithCompressionWriter() throws Exception { - final int NUM_ELEMENTS = 20; - final int PARALLELISM = 2; - final String outPath = hdfsURI + "/seq-non-rolling-out"; - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(PARALLELISM); - - DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS)) - .broadcast() - .filter(new OddEvenFilter()); - - DataStream<Tuple2<IntWritable, Text>> mapped = source.map(new MapFunction<Tuple2<Integer,String>, Tuple2<IntWritable, Text>>() { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<IntWritable, Text> map(Tuple2<Integer, String> value) throws Exception { - return Tuple2.of(new IntWritable(value.f0), new Text(value.f1)); - } - }); - - - RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath) - .setWriter(new SequenceFileWriter<IntWritable, Text>("Default", SequenceFile.CompressionType.BLOCK)) - .setBucketer(new NonRollingBucketer()) - .setPartPrefix("part") - .setPendingPrefix("") - .setPendingSuffix(""); - - mapped.addSink(sink); - - env.execute("RollingSink String Write Test"); - - FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0")); - - SequenceFile.Reader reader = new SequenceFile.Reader(inStream, - 1000, - 0, - 100000, - new Configuration()); - - IntWritable intWritable = new IntWritable(); - Text txt = new Text(); - - for (int i = 0; i < NUM_ELEMENTS; i += 2) { - reader.next(intWritable, txt); - Assert.assertEquals(i, intWritable.get()); - Assert.assertEquals("message #" + i, txt.toString()); - } - - reader.close(); - inStream.close(); - - inStream = dfs.open(new Path(outPath + "/part-1-0")); - - reader = new SequenceFile.Reader(inStream, - 1000, - 0, - 100000, - new Configuration()); - - for (int i = 1; i < NUM_ELEMENTS; i += 2) { - reader.next(intWritable, txt); - Assert.assertEquals(i, intWritable.get()); - Assert.assertEquals("message #" + i, txt.toString()); - } - - reader.close(); - inStream.close(); - } - - - /** - * This tests {@link AvroKeyValueSinkWriter} - * with non-rolling output and without compression. - */ - @Test - public void testNonRollingAvroKeyValueWithoutCompressionWriter() throws Exception { - final int NUM_ELEMENTS = 20; - final int PARALLELISM = 2; - final String outPath = hdfsURI + "/avro-kv-no-comp-non-rolling-out"; - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(PARALLELISM); - - DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS)) - .broadcast() - .filter(new OddEvenFilter()); - - - Map<String, String> properties = new HashMap<>(); - Schema keySchema = Schema.create(Type.INT); - Schema valueSchema = Schema.create(Type.STRING); - properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString()); - properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString()); - RollingSink<Tuple2<Integer, String>> sink = new RollingSink<Tuple2<Integer, String>>(outPath) - .setWriter(new AvroKeyValueSinkWriter<Integer, String>(properties)) - .setBucketer(new NonRollingBucketer()) - .setPartPrefix("part") - .setPendingPrefix("") - .setPendingSuffix(""); - - source.addSink(sink); - - env.execute("RollingSink Avro KeyValue Writer Test"); - - GenericData.setStringType(valueSchema, StringType.String); - Schema elementSchema = AvroKeyValue.getSchema(keySchema, valueSchema); - - FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0")); - SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<GenericRecord>(elementSchema); - DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader); - for (int i = 0; i < NUM_ELEMENTS; i += 2) { - AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next()); - int key = wrappedEntry.getKey().intValue(); - Assert.assertEquals(i, key); - String value = wrappedEntry.getValue(); - Assert.assertEquals("message #" + i, value); - } - - dataFileStream.close(); - inStream.close(); - - inStream = dfs.open(new Path(outPath + "/part-1-0")); - dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader); - - for (int i = 1; i < NUM_ELEMENTS; i += 2) { - AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next()); - int key = wrappedEntry.getKey().intValue(); - Assert.assertEquals(i, key); - String value = wrappedEntry.getValue(); - Assert.assertEquals("message #" + i, value); - } - - dataFileStream.close(); - inStream.close(); - } - - /** - * This tests {@link AvroKeyValueSinkWriter} - * with non-rolling output and with compression. - */ - @Test - public void testNonRollingAvroKeyValueWithCompressionWriter() throws Exception { - final int NUM_ELEMENTS = 20; - final int PARALLELISM = 2; - final String outPath = hdfsURI + "/avro-kv-no-comp-non-rolling-out"; - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(PARALLELISM); - - DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS)) - .broadcast() - .filter(new OddEvenFilter()); - - - Map<String, String> properties = new HashMap<>(); - Schema keySchema = Schema.create(Type.INT); - Schema valueSchema = Schema.create(Type.STRING); - properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString()); - properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString()); - properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, String.valueOf(true)); - properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC); - RollingSink<Tuple2<Integer, String>> sink = new RollingSink<Tuple2<Integer, String>>(outPath) - .setWriter(new AvroKeyValueSinkWriter<Integer, String>(properties)) - .setBucketer(new NonRollingBucketer()) - .setPartPrefix("part") - .setPendingPrefix("") - .setPendingSuffix(""); - - source.addSink(sink); - - env.execute("RollingSink Avro KeyValue Writer Test"); - - GenericData.setStringType(valueSchema, StringType.String); - Schema elementSchema = AvroKeyValue.getSchema(keySchema, valueSchema); - - FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0")); - SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<GenericRecord>(elementSchema); - DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader); - for (int i = 0; i < NUM_ELEMENTS; i += 2) { - AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next()); - int key = wrappedEntry.getKey().intValue(); - Assert.assertEquals(i, key); - String value = wrappedEntry.getValue(); - Assert.assertEquals("message #" + i, value); - } - - dataFileStream.close(); - inStream.close(); - - inStream = dfs.open(new Path(outPath + "/part-1-0")); - dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader); - - for (int i = 1; i < NUM_ELEMENTS; i += 2) { - AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next()); - int key = wrappedEntry.getKey().intValue(); - Assert.assertEquals(i, key); - String value = wrappedEntry.getValue(); - Assert.assertEquals("message #" + i, value); - } - - dataFileStream.close(); - inStream.close(); - } - - - /** - * This tests user defined hdfs configuration - * @throws Exception - */ - @Test - public void testUserDefinedConfiguration() throws Exception { - final int NUM_ELEMENTS = 20; - final int PARALLELISM = 2; - final String outPath = hdfsURI + "/string-non-rolling-with-config"; - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(PARALLELISM); - - DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS)) - .broadcast() - .filter(new OddEvenFilter()); - - Configuration conf = new Configuration(); - conf.set("io.file.buffer.size", "40960"); - RollingSink<String> sink = new RollingSink<String>(outPath) - .setFSConfig(conf) - .setWriter(new StreamWriterWithConfigCheck<String>("io.file.buffer.size", "40960")) - .setBucketer(new NonRollingBucketer()) - .setPartPrefix("part") - .setPendingPrefix("") - .setPendingSuffix(""); - - source - .map(new MapFunction<Tuple2<Integer,String>, String>() { - private static final long serialVersionUID = 1L; - @Override - public String map(Tuple2<Integer, String> value) throws Exception { - return value.f1; - } - }) - .addSink(sink); - - env.execute("RollingSink with configuration Test"); - - FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0")); - - BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); - - for (int i = 0; i < NUM_ELEMENTS; i += 2) { - String line = br.readLine(); - Assert.assertEquals("message #" + i, line); - } - - inStream.close(); - - inStream = dfs.open(new Path(outPath + "/part-1-0")); - - br = new BufferedReader(new InputStreamReader(inStream)); - - for (int i = 1; i < NUM_ELEMENTS; i += 2) { - String line = br.readLine(); - Assert.assertEquals("message #" + i, line); - } - - inStream.close(); - } - - // we use this to synchronize the clock changes to elements being processed - final static MultiShotLatch latch1 = new MultiShotLatch(); - final static MultiShotLatch latch2 = new MultiShotLatch(); - - /** - * This uses {@link org.apache.flink.streaming.connectors.fs.DateTimeBucketer} to - * produce rolling files. The clock of DateTimeBucketer is set to - * {@link ModifyableClock} to keep the time in lockstep with the processing of elements using - * latches. - */ - @Test - public void testDateTimeRollingStringWriter() throws Exception { - final int NUM_ELEMENTS = 20; - final int PARALLELISM = 2; - final String outPath = hdfsURI + "/rolling-out"; - DateTimeBucketer.setClock(new ModifyableClock()); - ModifyableClock.setCurrentTime(0); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(PARALLELISM); - - - - DataStream<Tuple2<Integer, String>> source = env.addSource(new WaitingTestSourceFunction( - NUM_ELEMENTS)) - .broadcast(); - - // the parallel flatMap is chained to the sink, so when it has seen 5 elements it can - // fire the latch - DataStream<String> mapped = source - .flatMap(new RichFlatMapFunction<Tuple2<Integer, String>, String>() { - private static final long serialVersionUID = 1L; - - int count = 0; - @Override - public void flatMap(Tuple2<Integer, String> value, - Collector<String> out) throws Exception { - out.collect(value.f1); - count++; - if (count >= 5) { - if (getRuntimeContext().getIndexOfThisSubtask() == 0) { - latch1.trigger(); - } else { - latch2.trigger(); - } - count = 0; - } - } - - }); - - RollingSink<String> sink = new RollingSink<String>(outPath) - .setBucketer(new DateTimeBucketer("ss")) - .setPartPrefix("part") - .setPendingPrefix("") - .setPendingSuffix(""); - - mapped.addSink(sink); - - env.execute("RollingSink String Write Test"); - - RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(outPath), true); - - // we should have 8 rolling files, 4 time intervals and parallelism of 2 - int numFiles = 0; - while (files.hasNext()) { - LocatedFileStatus file = files.next(); - numFiles++; - if (file.getPath().toString().contains("rolling-out/00")) { - FSDataInputStream inStream = dfs.open(file.getPath()); - - BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); - - for (int i = 0; i < 5; i++) { - String line = br.readLine(); - Assert.assertEquals("message #" + i, line); - } - - inStream.close(); - } else if (file.getPath().toString().contains("rolling-out/05")) { - FSDataInputStream inStream = dfs.open(file.getPath()); - - BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); - - for (int i = 5; i < 10; i++) { - String line = br.readLine(); - Assert.assertEquals("message #" + i, line); - } - - inStream.close(); - } else if (file.getPath().toString().contains("rolling-out/10")) { - FSDataInputStream inStream = dfs.open(file.getPath()); - - BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); - - for (int i = 10; i < 15; i++) { - String line = br.readLine(); - Assert.assertEquals("message #" + i, line); - } - - inStream.close(); - } else if (file.getPath().toString().contains("rolling-out/15")) { - FSDataInputStream inStream = dfs.open(file.getPath()); - - BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); - - for (int i = 15; i < 20; i++) { - String line = br.readLine(); - Assert.assertEquals("message #" + i, line); - } - - inStream.close(); - } else { - Assert.fail("File " + file + " does not match any expected roll pattern."); - } - } - - Assert.assertEquals(8, numFiles); - } - - private static final String PART_PREFIX = "part"; - private static final String PENDING_SUFFIX = ".pending"; - private static final String IN_PROGRESS_SUFFIX = ".in-progress"; - private static final String VALID_LENGTH_SUFFIX = ".valid"; - - @Test - public void testBucketStateTransitions() throws Exception { - final File outDir = tempFolder.newFolder(); - - OneInputStreamOperatorTestHarness<String, Object> testHarness = createRescalingTestSink(outDir, 1, 0); - testHarness.setup(); - testHarness.open(); - - testHarness.setProcessingTime(0L); - - // we have a bucket size of 5 bytes, so each record will get its own bucket, - // i.e. the bucket should roll after every record. - - testHarness.processElement(new StreamRecord<>("test1", 1L)); - testHarness.processElement(new StreamRecord<>("test2", 1L)); - checkFs(outDir, 1, 1 ,0, 0); - - testHarness.processElement(new StreamRecord<>("test3", 1L)); - checkFs(outDir, 1, 2, 0, 0); - - testHarness.snapshot(0, 0); - checkFs(outDir, 1, 2, 0, 0); - - testHarness.notifyOfCompletedCheckpoint(0); - checkFs(outDir, 1, 0, 2, 0); - - OperatorStateHandles snapshot = testHarness.snapshot(1, 0); - - testHarness.close(); - checkFs(outDir, 0, 1, 2, 0); - - testHarness = createRescalingTestSink(outDir, 1, 0); - testHarness.setup(); - testHarness.initializeState(snapshot); - testHarness.open(); - checkFs(outDir, 0, 0, 3, 1); - - snapshot = testHarness.snapshot(2, 0); - - testHarness.processElement(new StreamRecord<>("test4", 10)); - checkFs(outDir, 1, 0, 3, 1); - - testHarness = createRescalingTestSink(outDir, 1, 0); - testHarness.setup(); - testHarness.initializeState(snapshot); - testHarness.open(); - - // the in-progress file remains as we do not clean up now - checkFs(outDir, 1, 0, 3, 1); - - testHarness.close(); - - // at close it is not moved to final because it is not part - // of the current task's state, it was just a not cleaned up leftover. - checkFs(outDir, 1, 0, 3, 1); - } - - @Test - public void testScalingDown() throws Exception { - final File outDir = tempFolder.newFolder(); - - OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 3, 0); - testHarness1.setup(); - testHarness1.open(); - - OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 3, 1); - testHarness2.setup(); - testHarness2.open(); - - OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2); - testHarness3.setup(); - testHarness3.open(); - - testHarness1.processElement(new StreamRecord<>("test1", 0L)); - checkFs(outDir, 1, 0, 0, 0); - - testHarness2.processElement(new StreamRecord<>("test2", 0L)); - testHarness2.processElement(new StreamRecord<>("test3", 0L)); - testHarness2.processElement(new StreamRecord<>("test4", 0L)); - testHarness2.processElement(new StreamRecord<>("test5", 0L)); - testHarness2.processElement(new StreamRecord<>("test6", 0L)); - checkFs(outDir, 2, 4, 0, 0); - - testHarness3.processElement(new StreamRecord<>("test7", 0L)); - testHarness3.processElement(new StreamRecord<>("test8", 0L)); - checkFs(outDir, 3, 5, 0, 0); - - // intentionally we snapshot them in a not ascending order so that the states are shuffled - OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( - testHarness3.snapshot(0, 0), - testHarness1.snapshot(0, 0), - testHarness2.snapshot(0, 0) - ); - - // with the above state reshuffling, we expect testHarness4 to take the - // state of the previous testHarness3 and testHarness1 while testHarness5 - // will take that of the previous testHarness1 - - OneInputStreamOperatorTestHarness<String, Object> testHarness4 = createRescalingTestSink(outDir, 2, 0); - testHarness4.setup(); - testHarness4.initializeState(mergedSnapshot); - testHarness4.open(); - - // we do not have a length file for part-2-0 because bucket part-2-0 - // was not "in-progress", but "pending" (its full content is valid). - checkFs(outDir, 1, 4, 3, 2); - - OneInputStreamOperatorTestHarness<String, Object> testHarness5 = createRescalingTestSink(outDir, 2, 1); - testHarness5.setup(); - testHarness5.initializeState(mergedSnapshot); - testHarness5.open(); - - checkFs(outDir, 0, 0, 8, 3); - } - - @Test - public void testScalingUp() throws Exception { - final File outDir = tempFolder.newFolder(); - - OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 2, 0); - testHarness1.setup(); - testHarness1.open(); - - OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 2, 0); - testHarness2.setup(); - testHarness2.open(); - - testHarness1.processElement(new StreamRecord<>("test1", 0L)); - testHarness1.processElement(new StreamRecord<>("test2", 0L)); - - checkFs(outDir, 1, 1, 0, 0); - - testHarness2.processElement(new StreamRecord<>("test3", 0L)); - testHarness2.processElement(new StreamRecord<>("test4", 0L)); - testHarness2.processElement(new StreamRecord<>("test5", 0L)); - - checkFs(outDir, 2, 3, 0, 0); - - // intentionally we snapshot them in the reverse order so that the states are shuffled - OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( - testHarness2.snapshot(0, 0), - testHarness1.snapshot(0, 0) - ); - - testHarness1 = createRescalingTestSink(outDir, 3, 0); - testHarness1.setup(); - testHarness1.initializeState(mergedSnapshot); - testHarness1.open(); - - checkFs(outDir, 1, 1, 3, 1); - - testHarness2 = createRescalingTestSink(outDir, 3, 1); - testHarness2.setup(); - testHarness2.initializeState(mergedSnapshot); - testHarness2.open(); - - checkFs(outDir, 0, 0, 5, 2); - - OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2); - testHarness3.setup(); - testHarness3.initializeState(mergedSnapshot); - testHarness3.open(); - - checkFs(outDir, 0, 0, 5, 2); - - testHarness1.processElement(new StreamRecord<>("test6", 0)); - testHarness2.processElement(new StreamRecord<>("test6", 0)); - testHarness3.processElement(new StreamRecord<>("test6", 0)); - - // 3 for the different tasks - checkFs(outDir, 3, 0, 5, 2); - - testHarness1.snapshot(1, 0); - testHarness2.snapshot(1, 0); - testHarness3.snapshot(1, 0); - - testHarness1.close(); - testHarness2.close(); - testHarness3.close(); - - checkFs(outDir, 0, 3, 5, 2); - } - - private OneInputStreamOperatorTestHarness<String, Object> createRescalingTestSink( - File outDir, int totalParallelism, int taskIdx) throws Exception { - - RollingSink<String> sink = new RollingSink<String>(outDir.getAbsolutePath()) - .setWriter(new StringWriter<String>()) - .setBatchSize(5) - .setPartPrefix(PART_PREFIX) - .setInProgressPrefix("") - .setPendingPrefix("") - .setValidLengthPrefix("") - .setInProgressSuffix(IN_PROGRESS_SUFFIX) - .setPendingSuffix(PENDING_SUFFIX) - .setValidLengthSuffix(VALID_LENGTH_SUFFIX); - - return createTestSink(sink, totalParallelism, taskIdx); - } - - private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink( - RollingSink<T> sink, int totalParallelism, int taskIdx) throws Exception { - return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), 10, totalParallelism, taskIdx); - } - - private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException { - int inProg = 0; - int pend = 0; - int compl = 0; - int val = 0; - - for (File file: FileUtils.listFiles(outDir, null, true)) { - if (file.getAbsolutePath().endsWith("crc")) { - continue; - } - String path = file.getPath(); - if (path.endsWith(IN_PROGRESS_SUFFIX)) { - inProg++; - } else if (path.endsWith(PENDING_SUFFIX)) { - pend++; - } else if (path.endsWith(VALID_LENGTH_SUFFIX)) { - val++; - } else if (path.contains(PART_PREFIX)) { - compl++; - } - } - - Assert.assertEquals(inprogress, inProg); - Assert.assertEquals(pending, pend); - Assert.assertEquals(completed, compl); - Assert.assertEquals(valid, val); - } - - private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> { - private static final long serialVersionUID = 1L; - - private volatile boolean running = true; - - private final int numElements; - - public TestSourceFunction(int numElements) { - this.numElements = numElements; - } - - @Override - public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception { - for (int i = 0; i < numElements && running; i++) { - ctx.collect(Tuple2.of(i, "message #" + i)); - } - } - - @Override - public void cancel() { - running = false; - } - } - - /** - * This waits on the two multi-shot latches. The latches are triggered in a parallel - * flatMap inside the test topology. - */ - private static class WaitingTestSourceFunction implements SourceFunction<Tuple2<Integer, String>> { - private static final long serialVersionUID = 1L; - - private volatile boolean running = true; - - private final int numElements; - - public WaitingTestSourceFunction(int numElements) { - this.numElements = numElements; - } - - @Override - public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception { - for (int i = 0; i < numElements && running; i++) { - if (i % 5 == 0 && i > 0) { - // update the clock after "five seconds", so we get 20 seconds in total - // with 5 elements in each time window - latch1.await(); - latch2.await(); - ModifyableClock.setCurrentTime(i * 1000); - } - ctx.collect(Tuple2.of(i, "message #" + i)); - } - } - - @Override - public void cancel() { - running = false; - } - } - - - private static class StreamWriterWithConfigCheck<T> extends StringWriter<T> { - private String key; - private String expect; - public StreamWriterWithConfigCheck(String key, String expect) { - this.key = key; - this.expect = expect; - } - - @Override - public void open(FileSystem fs, Path path) throws IOException { - super.open(fs, path); - Assert.assertEquals(expect, fs.getConf().get(key)); - } - - @Override - public Writer<T> duplicate() { - return new StreamWriterWithConfigCheck<>(key, expect); - } - } - - public static class OddEvenFilter extends RichFilterFunction<Tuple2<Integer, String>> { - private static final long serialVersionUID = 1L; - - @Override - public boolean filter(Tuple2<Integer, String> value) throws Exception { - if (getRuntimeContext().getIndexOfThisSubtask() == 0) { - return value.f0 % 2 == 0; - } else { - return value.f0 % 2 == 1; - } - } - } - - public static class ModifyableClock implements Clock { - - private static volatile long currentTime = 0; - - public static void setCurrentTime(long currentTime) { - ModifyableClock.currentTime = currentTime; - } - - @Override - public long currentTimeMillis() { - return currentTime; - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java deleted file mode 100644 index eb12d07..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.fs; - -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.runtime.security.SecurityUtils; -import org.apache.flink.streaming.util.TestStreamEnvironment; -import org.apache.flink.test.util.SecureTestEnvironment; -import org.apache.flink.test.util.TestingSecurityContext; -import org.apache.flink.test.util.TestBaseUtils; -import org.apache.flink.util.NetUtils; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.http.HttpConfig; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.VersionInfo; -import org.junit.AfterClass; -import org.junit.Assume; -import org.junit.BeforeClass; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY; - -/** - * Tests for running {@link RollingSinkSecuredITCase} which is an extension of {@link RollingSink} in secure environment - * Note: only executed for Hadoop version > 3.x.x - */ -public class RollingSinkSecuredITCase extends RollingSinkITCase { - - protected static final Logger LOG = LoggerFactory.getLogger(RollingSinkSecuredITCase.class); - - /** - * Skips all tests if the Hadoop version doesn't match. - * We can't run this test class until HDFS-9213 is fixed which allows a secure DataNode - * to bind to non-privileged ports for testing. - * For now, we skip this test class until Hadoop version 3.x.x. - */ - private static void skipIfHadoopVersionIsNotAppropriate() { - // Skips all tests if the Hadoop version doesn't match - String hadoopVersionString = VersionInfo.getVersion(); - String[] split = hadoopVersionString.split("\\."); - if (split.length != 3) { - throw new IllegalStateException("Hadoop version was not of format 'X.X.X': " + hadoopVersionString); - } - Assume.assumeTrue( - // check whether we're running Hadoop version >= 3.x.x - Integer.parseInt(split[0]) >= 3 - ); - } - - /* - * override super class static methods to avoid creating MiniDFS and MiniFlink with wrong configurations - * and out-of-order sequence for secure cluster - */ - @BeforeClass - public static void setup() throws Exception {} - - @AfterClass - public static void teardown() throws Exception {} - - @BeforeClass - public static void createHDFS() throws IOException {} - - @AfterClass - public static void destroyHDFS() {} - - @BeforeClass - public static void startSecureCluster() throws Exception { - - skipIfHadoopVersionIsNotAppropriate(); - - LOG.info("starting secure cluster environment for testing"); - - dataDir = tempFolder.newFolder(); - - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath()); - - SecureTestEnvironment.prepare(tempFolder); - - populateSecureConfigurations(); - - Configuration flinkConfig = new Configuration(); - flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, - SecureTestEnvironment.getTestKeytab()); - flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, - SecureTestEnvironment.getHadoopServicePrincipal()); - - SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig); - ctx.setHadoopConfiguration(conf); - try { - TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap()); - } catch (Exception e) { - throw new RuntimeException("Exception occurred while setting up secure test context. Reason: {}", e); - } - - File hdfsSiteXML = new File(dataDir.getAbsolutePath() + "/hdfs-site.xml"); - - FileWriter writer = new FileWriter(hdfsSiteXML); - conf.writeXml(writer); - writer.flush(); - writer.close(); - - Map<String, String> map = new HashMap<String, String>(System.getenv()); - map.put("HADOOP_CONF_DIR", hdfsSiteXML.getParentFile().getAbsolutePath()); - TestBaseUtils.setEnv(map); - - - MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); - builder.checkDataNodeAddrConfig(true); - builder.checkDataNodeHostConfig(true); - hdfsCluster = builder.build(); - - dfs = hdfsCluster.getFileSystem(); - - hdfsURI = "hdfs://" - + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort()) - + "/"; - - startSecureFlinkClusterWithRecoveryModeEnabled(); - } - - @AfterClass - public static void teardownSecureCluster() throws Exception { - LOG.info("tearing down secure cluster environment"); - - TestStreamEnvironment.unsetAsContext(); - stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT); - - if (hdfsCluster != null) { - hdfsCluster.shutdown(); - } - - SecureTestEnvironment.cleanup(); - } - - private static void populateSecureConfigurations() { - - String dataTransferProtection = "authentication"; - - SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, conf); - conf.set(DFS_NAMENODE_USER_NAME_KEY, SecureTestEnvironment.getHadoopServicePrincipal()); - conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, SecureTestEnvironment.getTestKeytab()); - conf.set(DFS_DATANODE_USER_NAME_KEY, SecureTestEnvironment.getHadoopServicePrincipal()); - conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, SecureTestEnvironment.getTestKeytab()); - conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, SecureTestEnvironment.getHadoopServicePrincipal()); - - conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); - - conf.set("dfs.data.transfer.protection", dataTransferProtection); - - conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTP_ONLY.name()); - - conf.set(DFS_ENCRYPT_DATA_TRANSFER_KEY, "false"); - - conf.setInt("dfs.datanode.socket.write.timeout", 0); - - /* - * We ae setting the port number to privileged port - see HDFS-9213 - * This requires the user to have root privilege to bind to the port - * Use below command (ubuntu) to set privilege to java process for the - * bind() to work if the java process is not running as root. - * setcap 'cap_net_bind_service=+ep' /path/to/java - */ - conf.set(DFS_DATANODE_ADDRESS_KEY, "localhost:1002"); - conf.set(DFS_DATANODE_HOST_NAME_KEY, "localhost"); - conf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "localhost:1003"); - } - - private static void startSecureFlinkClusterWithRecoveryModeEnabled() { - try { - LOG.info("Starting Flink and ZK in secure mode"); - - dfs.mkdirs(new Path("/flink/checkpoints")); - dfs.mkdirs(new Path("/flink/recovery")); - - org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration(); - - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM); - config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false); - config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3); - config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); - config.setString(ConfigConstants.STATE_BACKEND, "filesystem"); - config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints"); - config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery"); - config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints"); - - SecureTestEnvironment.populateFlinkSecureConfigurations(config); - - cluster = TestBaseUtils.startCluster(config, false); - TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM); - - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - /* For secure cluster testing, it is enough to run only one test and override below test methods - * to keep the overall build time minimal - */ - @Override - public void testNonRollingSequenceFileWithoutCompressionWriter() throws Exception {} - - @Override - public void testNonRollingSequenceFileWithCompressionWriter() throws Exception {} - - @Override - public void testNonRollingAvroKeyValueWithoutCompressionWriter() throws Exception {} - - @Override - public void testNonRollingAvroKeyValueWithCompressionWriter() throws Exception {} - - @Override - public void testDateTimeRollingStringWriter() throws Exception {} - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java deleted file mode 100644 index 54703a3..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.fs.bucketing; - -import com.google.common.collect.Sets; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase; -import org.apache.flink.util.NetUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.rules.TemporaryFolder; - -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.BufferedReader; -import java.util.HashSet; -import java.util.Random; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static org.junit.Assert.assertTrue; - -/** - * Tests for {@link BucketingSink}. - * - * <p> - * This test only verifies the exactly once behaviour of the sink. Another test tests the - * rolling behaviour. - */ -public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestBase { - - final long NUM_STRINGS = 16_000; - - @ClassRule - public static TemporaryFolder tempFolder = new TemporaryFolder(); - - private static MiniDFSCluster hdfsCluster; - private static org.apache.hadoop.fs.FileSystem dfs; - - private static String outPath; - - private static final String PENDING_SUFFIX = ".pending"; - private static final String IN_PROGRESS_SUFFIX = ".in-progress"; - - @BeforeClass - public static void createHDFS() throws IOException { - Configuration conf = new Configuration(); - - File dataDir = tempFolder.newFolder(); - - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath()); - MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); - hdfsCluster = builder.build(); - - dfs = hdfsCluster.getFileSystem(); - - outPath = "hdfs://" - + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort()) - + "/string-non-rolling-out"; - } - - @AfterClass - public static void destroyHDFS() { - if (hdfsCluster != null) { - hdfsCluster.shutdown(); - } - } - - @Override - public void testProgram(StreamExecutionEnvironment env) { - assertTrue("Broken test setup", NUM_STRINGS % 40 == 0); - - int PARALLELISM = 12; - - env.enableCheckpointing(20); - env.setParallelism(PARALLELISM); - env.disableOperatorChaining(); - - DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain(); - - DataStream<String> mapped = stream - .map(new OnceFailingIdentityMapper(NUM_STRINGS)); - - BucketingSink<String> sink = new BucketingSink<String>(outPath) - .setBucketer(new BasePathBucketer<String>()) - .setBatchSize(10000) - .setValidLengthPrefix("") - .setPendingPrefix("") - .setPendingSuffix(PENDING_SUFFIX) - .setInProgressSuffix(IN_PROGRESS_SUFFIX); - - mapped.addSink(sink); - - } - - @Override - public void postSubmit() throws Exception { - // We read the files and verify that we have read all the strings. If a valid-length - // file exists we only read the file to that point. (This test should work with - // FileSystems that support truncate() and with others as well.) - - Pattern messageRegex = Pattern.compile("message (\\d*)"); - - // Keep a set of the message IDs that we read. The size must equal the read count and - // the NUM_STRINGS. If numRead is bigger than the size of the set we have seen some - // elements twice. - Set<Integer> readNumbers = Sets.newHashSet(); - - HashSet<String> uniqMessagesRead = new HashSet<>(); - HashSet<String> messagesInCommittedFiles = new HashSet<>(); - - RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path( - outPath), true); - - while (files.hasNext()) { - LocatedFileStatus file = files.next(); - - if (!file.getPath().toString().endsWith(".valid-length")) { - int validLength = (int) file.getLen(); - if (dfs.exists(file.getPath().suffix(".valid-length"))) { - FSDataInputStream inStream = dfs.open(file.getPath().suffix(".valid-length")); - String validLengthString = inStream.readUTF(); - validLength = Integer.parseInt(validLengthString); - System.out.println("VALID LENGTH: " + validLength); - } - FSDataInputStream inStream = dfs.open(file.getPath()); - byte[] buffer = new byte[validLength]; - inStream.readFully(0, buffer, 0, validLength); - inStream.close(); - - ByteArrayInputStream bais = new ByteArrayInputStream(buffer); - - InputStreamReader inStreamReader = new InputStreamReader(bais); - BufferedReader br = new BufferedReader(inStreamReader); - - String line = br.readLine(); - while (line != null) { - Matcher matcher = messageRegex.matcher(line); - if (matcher.matches()) { - uniqMessagesRead.add(line); - - // check that in the committed files there are no duplicates - if (!file.getPath().toString().endsWith(IN_PROGRESS_SUFFIX) && !file.getPath().toString().endsWith(PENDING_SUFFIX)) { - if (!messagesInCommittedFiles.add(line)) { - Assert.fail("Duplicate entry in committed bucket."); - } - } - - int messageId = Integer.parseInt(matcher.group(1)); - readNumbers.add(messageId); - } else { - Assert.fail("Read line does not match expected pattern."); - } - line = br.readLine(); - } - br.close(); - inStreamReader.close(); - bais.close(); - } - } - - // Verify that we read all strings (at-least-once) - Assert.assertEquals(NUM_STRINGS, readNumbers.size()); - - // Verify that we don't have duplicates (boom!, exactly-once) - Assert.assertEquals(NUM_STRINGS, uniqMessagesRead.size()); - } - - private static class OnceFailingIdentityMapper extends RichMapFunction<String, String> { - private static final long serialVersionUID = 1L; - - private static volatile boolean hasFailed = false; - - private final long numElements; - - private long failurePos; - private long count; - - - OnceFailingIdentityMapper(long numElements) { - this.numElements = numElements; - } - - @Override - public void open(org.apache.flink.configuration.Configuration parameters) throws IOException { - long failurePosMin = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks()); - long failurePosMax = (long) (0.9 * numElements / getRuntimeContext().getNumberOfParallelSubtasks()); - - failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; - count = 0; - } - - @Override - public String map(String value) throws Exception { - count++; - if (!hasFailed && count >= failurePos) { - hasFailed = true; - throw new Exception("Test Failure"); - } - - return value; - } - } - - private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> - implements CheckpointedAsynchronously<Integer> { - - private static final long serialVersionUID = 1L; - - private final long numElements; - - private int index; - - private volatile boolean isRunning = true; - - - StringGeneratingSourceFunction(long numElements) { - this.numElements = numElements; - } - - @Override - public void run(SourceContext<String> ctx) throws Exception { - final Object lockingObject = ctx.getCheckpointLock(); - - final int step = getRuntimeContext().getNumberOfParallelSubtasks(); - - if (index == 0) { - index = getRuntimeContext().getIndexOfThisSubtask(); - } - - while (isRunning && index < numElements) { - - Thread.sleep(1); - synchronized (lockingObject) { - ctx.collect("message " + index); - index += step; - } - } - } - - @Override - public void cancel() { - isRunning = false; - } - - private static String randomString(StringBuilder bld, Random rnd) { - final int len = rnd.nextInt(10) + 5; - - for (int i = 0; i < len; i++) { - char next = (char) (rnd.nextInt(20000) + 33); - bld.append(next); - } - - return bld.toString(); - } - - @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return index; - } - - @Override - public void restoreState(Integer state) { - index = state; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java deleted file mode 100644 index d671874..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java +++ /dev/null @@ -1,867 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.fs.bucketing; - -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileConstants; -import org.apache.avro.file.DataFileStream; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.specific.SpecificDatumReader; -import org.apache.commons.io.FileUtils; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter; -import org.apache.flink.streaming.connectors.fs.Clock; -import org.apache.flink.streaming.connectors.fs.SequenceFileWriter; -import org.apache.flink.streaming.connectors.fs.StringWriter; -import org.apache.flink.streaming.connectors.fs.Writer; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; -import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.util.NetUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.Assert; -import org.junit.rules.TemporaryFolder; - -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.HashMap; -import java.util.Map; - -public class BucketingSinkTest { - @ClassRule - public static TemporaryFolder tempFolder = new TemporaryFolder(); - - private static MiniDFSCluster hdfsCluster; - private static org.apache.hadoop.fs.FileSystem dfs; - private static String hdfsURI; - - private static final String PART_PREFIX = "part"; - private static final String PENDING_SUFFIX = ".pending"; - private static final String IN_PROGRESS_SUFFIX = ".in-progress"; - private static final String VALID_LENGTH_SUFFIX = ".valid"; - - private OneInputStreamOperatorTestHarness<String, Object> createRescalingTestSink( - File outDir, int totalParallelism, int taskIdx, long inactivityInterval) throws Exception { - - BucketingSink<String> sink = new BucketingSink<String>(outDir.getAbsolutePath()) - .setBucketer(new Bucketer<String>() { - private static final long serialVersionUID = 1L; - - @Override - public Path getBucketPath(Clock clock, Path basePath, String element) { - return new Path(basePath, element); - } - }) - .setWriter(new StringWriter<String>()) - .setInactiveBucketCheckInterval(inactivityInterval) - .setInactiveBucketThreshold(inactivityInterval) - .setPartPrefix(PART_PREFIX) - .setInProgressPrefix("") - .setPendingPrefix("") - .setValidLengthPrefix("") - .setInProgressSuffix(IN_PROGRESS_SUFFIX) - .setPendingSuffix(PENDING_SUFFIX) - .setValidLengthSuffix(VALID_LENGTH_SUFFIX); - - return createTestSink(sink, totalParallelism, taskIdx); - } - - private OneInputStreamOperatorTestHarness<String, Object> createTestSink(File dataDir, int totalParallelism, int taskIdx) throws Exception { - BucketingSink<String> sink = new BucketingSink<String>(dataDir.getAbsolutePath()) - .setBucketer(new Bucketer<String>() { - private static final long serialVersionUID = 1L; - - @Override - public Path getBucketPath(Clock clock, Path basePath, String element) { - return new Path(basePath, element); - } - }) - .setWriter(new StringWriter<String>()) - .setPartPrefix(PART_PREFIX) - .setPendingPrefix("") - .setInactiveBucketCheckInterval(5*60*1000L) - .setInactiveBucketThreshold(5*60*1000L) - .setPendingSuffix(PENDING_SUFFIX) - .setInProgressSuffix(IN_PROGRESS_SUFFIX); - - return createTestSink(sink, totalParallelism, taskIdx); - } - - private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink( - BucketingSink<T> sink, int totalParallelism, int taskIdx) throws Exception { - return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), 10, totalParallelism, taskIdx); - } - - @BeforeClass - public static void createHDFS() throws IOException { - Configuration conf = new Configuration(); - - File dataDir = tempFolder.newFolder(); - - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath()); - MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); - hdfsCluster = builder.build(); - - dfs = hdfsCluster.getFileSystem(); - - hdfsURI = "hdfs://" - + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort()) - + "/"; - } - - @AfterClass - public static void destroyHDFS() { - hdfsCluster.shutdown(); - } - - @Test - public void testInactivityPeriodWithLateNotify() throws Exception { - final File outDir = tempFolder.newFolder(); - - OneInputStreamOperatorTestHarness<String, Object> testHarness = createRescalingTestSink(outDir, 1, 0, 100); - testHarness.setup(); - testHarness.open(); - - testHarness.setProcessingTime(0L); - - testHarness.processElement(new StreamRecord<>("test1", 1L)); - testHarness.processElement(new StreamRecord<>("test2", 1L)); - checkFs(outDir, 2, 0 ,0, 0); - - testHarness.setProcessingTime(101L); // put some in pending - checkFs(outDir, 0, 2, 0, 0); - - testHarness.snapshot(0, 0); // put them in pending for 0 - checkFs(outDir, 0, 2, 0, 0); - - testHarness.processElement(new StreamRecord<>("test3", 1L)); - testHarness.processElement(new StreamRecord<>("test4", 1L)); - - testHarness.setProcessingTime(202L); // put some in pending - - testHarness.snapshot(1, 0); // put them in pending for 1 - checkFs(outDir, 0, 4, 0, 0); - - testHarness.notifyOfCompletedCheckpoint(0); // put the pending for 0 to the "committed" state - checkFs(outDir, 0, 2, 2, 0); - - testHarness.notifyOfCompletedCheckpoint(1); // put the pending for 1 to the "committed" state - checkFs(outDir, 0, 0, 4, 0); - } - - @Test - public void testBucketStateTransitions() throws Exception { - final File outDir = tempFolder.newFolder(); - - OneInputStreamOperatorTestHarness<String, Object> testHarness = createRescalingTestSink(outDir, 1, 0, 100); - testHarness.setup(); - testHarness.open(); - - testHarness.setProcessingTime(0L); - - testHarness.processElement(new StreamRecord<>("test1", 1L)); - testHarness.processElement(new StreamRecord<>("test2", 1L)); - checkFs(outDir, 2, 0 ,0, 0); - - // this is to check the inactivity threshold - testHarness.setProcessingTime(101L); - checkFs(outDir, 0, 2, 0, 0); - - testHarness.processElement(new StreamRecord<>("test3", 1L)); - checkFs(outDir, 1, 2, 0, 0); - - testHarness.snapshot(0, 0); - checkFs(outDir, 1, 2, 0, 0); - - testHarness.notifyOfCompletedCheckpoint(0); - checkFs(outDir, 1, 0, 2, 0); - - OperatorStateHandles snapshot = testHarness.snapshot(1, 0); - - testHarness.close(); - checkFs(outDir, 0, 1, 2, 0); - - testHarness = createRescalingTestSink(outDir, 1, 0, 100); - testHarness.setup(); - testHarness.initializeState(snapshot); - testHarness.open(); - checkFs(outDir, 0, 0, 3, 1); - - snapshot = testHarness.snapshot(2, 0); - - testHarness.processElement(new StreamRecord<>("test4", 10)); - checkFs(outDir, 1, 0, 3, 1); - - testHarness = createRescalingTestSink(outDir, 1, 0, 100); - testHarness.setup(); - testHarness.initializeState(snapshot); - testHarness.open(); - - // the in-progress file remains as we do not clean up now - checkFs(outDir, 1, 0, 3, 1); - - testHarness.close(); - - // at close it is not moved to final because it is not part - // of the current task's state, it was just a not cleaned up leftover. - checkFs(outDir, 1, 0, 3, 1); - } - - @Test - public void testSameParallelismWithShufflingStates() throws Exception { - final File outDir = tempFolder.newFolder(); - - OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 2, 0, 100); - testHarness1.setup(); - testHarness1.open(); - - OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 2, 1, 100); - testHarness2.setup(); - testHarness2.open(); - - testHarness1.processElement(new StreamRecord<>("test1", 0L)); - checkFs(outDir, 1, 0, 0, 0); - - testHarness2.processElement(new StreamRecord<>("test2", 0L)); - checkFs(outDir, 2, 0, 0, 0); - - // intentionally we snapshot them in the reverse order so that the states are shuffled - OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( - testHarness2.snapshot(0, 0), - testHarness1.snapshot(0, 0) - ); - - checkFs(outDir, 2, 0, 0, 0); - - // this will not be included in any checkpoint so it can be cleaned up (although we do not) - testHarness2.processElement(new StreamRecord<>("test3", 0L)); - checkFs(outDir, 3, 0, 0, 0); - - testHarness1 = createRescalingTestSink(outDir, 2, 0, 100); - testHarness1.setup(); - testHarness1.initializeState(mergedSnapshot); - testHarness1.open(); - - // the one in-progress will be the one assigned to the next instance, - // the other is the test3 which is just not cleaned up - checkFs(outDir, 2, 0, 1, 1); - - testHarness2 = createRescalingTestSink(outDir, 2, 1, 100); - testHarness2.setup(); - testHarness2.initializeState(mergedSnapshot); - testHarness2.open(); - - checkFs(outDir, 1, 0, 2, 2); - - testHarness1.close(); - testHarness2.close(); - - // the 1 in-progress can be discarded. - checkFs(outDir, 1, 0, 2, 2); - } - - @Test - public void testScalingDown() throws Exception { - final File outDir = tempFolder.newFolder(); - - OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 3, 0, 100); - testHarness1.setup(); - testHarness1.open(); - - OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 3, 1, 100); - testHarness2.setup(); - testHarness2.open(); - - OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2, 100); - testHarness3.setup(); - testHarness3.open(); - - testHarness1.processElement(new StreamRecord<>("test1", 0L)); - checkFs(outDir, 1, 0, 0, 0); - - testHarness2.processElement(new StreamRecord<>("test2", 0L)); - checkFs(outDir, 2, 0, 0, 0); - - testHarness3.processElement(new StreamRecord<>("test3", 0L)); - testHarness3.processElement(new StreamRecord<>("test4", 0L)); - checkFs(outDir, 4, 0, 0, 0); - - // intentionally we snapshot them in the reverse order so that the states are shuffled - OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( - testHarness3.snapshot(0, 0), - testHarness1.snapshot(0, 0), - testHarness2.snapshot(0, 0) - ); - - testHarness1 = createRescalingTestSink(outDir, 2, 0, 100); - testHarness1.setup(); - testHarness1.initializeState(mergedSnapshot); - testHarness1.open(); - - checkFs(outDir, 1, 0, 3, 3); - - testHarness2 = createRescalingTestSink(outDir, 2, 1, 100); - testHarness2.setup(); - testHarness2.initializeState(mergedSnapshot); - testHarness2.open(); - - checkFs(outDir, 0, 0, 4, 4); - } - - @Test - public void testScalingUp() throws Exception { - final File outDir = tempFolder.newFolder(); - - OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 2, 0, 100); - testHarness1.setup(); - testHarness1.open(); - - OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 2, 0, 100); - testHarness2.setup(); - testHarness2.open(); - - testHarness1.processElement(new StreamRecord<>("test1", 1L)); - testHarness1.processElement(new StreamRecord<>("test2", 1L)); - - checkFs(outDir, 2, 0, 0, 0); - - testHarness2.processElement(new StreamRecord<>("test3", 1L)); - testHarness2.processElement(new StreamRecord<>("test4", 1L)); - testHarness2.processElement(new StreamRecord<>("test5", 1L)); - - checkFs(outDir, 5, 0, 0, 0); - - // intentionally we snapshot them in the reverse order so that the states are shuffled - OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( - testHarness2.snapshot(0, 0), - testHarness1.snapshot(0, 0) - ); - - testHarness1 = createRescalingTestSink(outDir, 3, 0, 100); - testHarness1.setup(); - testHarness1.initializeState(mergedSnapshot); - testHarness1.open(); - - checkFs(outDir, 2, 0, 3, 3); - - testHarness2 = createRescalingTestSink(outDir, 3, 1, 100); - testHarness2.setup(); - testHarness2.initializeState(mergedSnapshot); - testHarness2.open(); - - checkFs(outDir, 0, 0, 5, 5); - - OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2, 100); - testHarness3.setup(); - testHarness3.initializeState(mergedSnapshot); - testHarness3.open(); - - checkFs(outDir, 0, 0, 5, 5); - - testHarness1.processElement(new StreamRecord<>("test6", 0)); - testHarness2.processElement(new StreamRecord<>("test6", 0)); - testHarness3.processElement(new StreamRecord<>("test6", 0)); - - checkFs(outDir, 3, 0, 5, 5); - - testHarness1.snapshot(1, 0); - testHarness2.snapshot(1, 0); - testHarness3.snapshot(1, 0); - - testHarness1.close(); - testHarness2.close(); - testHarness3.close(); - - checkFs(outDir, 0, 3, 5, 5); - } - - private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException { - int inProg = 0; - int pend = 0; - int compl = 0; - int val = 0; - - for (File file: FileUtils.listFiles(outDir, null, true)) { - if (file.getAbsolutePath().endsWith("crc")) { - continue; - } - String path = file.getPath(); - if (path.endsWith(IN_PROGRESS_SUFFIX)) { - inProg++; - } else if (path.endsWith(PENDING_SUFFIX)) { - pend++; - } else if (path.endsWith(VALID_LENGTH_SUFFIX)) { - val++; - } else if (path.contains(PART_PREFIX)) { - compl++; - } - } - - Assert.assertEquals(inprogress, inProg); - Assert.assertEquals(pending, pend); - Assert.assertEquals(completed, compl); - Assert.assertEquals(valid, val); - } - - /** - * This tests {@link StringWriter} with - * non-bucketing output. - */ - @Test - public void testNonRollingStringWriter() throws Exception { - final String outPath = hdfsURI + "/string-non-rolling-out"; - - final int numElements = 20; - - BucketingSink<String> sink = new BucketingSink<String>(outPath) - .setBucketer(new BasePathBucketer<String>()) - .setPartPrefix(PART_PREFIX) - .setPendingPrefix("") - .setPendingSuffix(""); - - OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink, 1, 0); - - testHarness.setProcessingTime(0L); - - testHarness.setup(); - testHarness.open(); - - for (int i = 0; i < numElements; i++) { - testHarness.processElement(new StreamRecord<>("message #" + Integer.toString(i))); - } - - testHarness.close(); - - FSDataInputStream inStream = dfs.open(new Path(outPath + "/" + PART_PREFIX + "-0-0")); - - BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); - - for (int i = 0; i < numElements; i++) { - String line = br.readLine(); - Assert.assertEquals("message #" + i, line); - } - - inStream.close(); - } - - /** - * This tests {@link SequenceFileWriter} - * with non-rolling output and without compression. - */ - @Test - public void testNonRollingSequenceFileWithoutCompressionWriter() throws Exception { - final String outPath = hdfsURI + "/seq-no-comp-non-rolling-out"; - - final int numElements = 20; - - BucketingSink<Tuple2<IntWritable, Text>> sink = new BucketingSink<Tuple2<IntWritable, Text>>(outPath) - .setWriter(new SequenceFileWriter<IntWritable, Text>()) - .setBucketer(new BasePathBucketer<Tuple2<IntWritable, Text>>()) - .setPartPrefix(PART_PREFIX) - .setPendingPrefix("") - .setPendingSuffix(""); - - sink.setInputType(TypeInformation.of(new TypeHint<Tuple2<IntWritable, Text>>(){}), new ExecutionConfig()); - - OneInputStreamOperatorTestHarness<Tuple2<IntWritable, Text>, Object> testHarness = - createTestSink(sink, 1, 0); - - testHarness.setProcessingTime(0L); - - testHarness.setup(); - testHarness.open(); - - for (int i = 0; i < numElements; i++) { - testHarness.processElement(new StreamRecord<>(Tuple2.of( - new IntWritable(i), - new Text("message #" + Integer.toString(i)) - ))); - } - - testHarness.close(); - - FSDataInputStream inStream = dfs.open(new Path(outPath + "/" + PART_PREFIX + "-0-0")); - - SequenceFile.Reader reader = new SequenceFile.Reader(inStream, 1000, 0, 100000, new Configuration()); - - IntWritable intWritable = new IntWritable(); - Text txt = new Text(); - - for (int i = 0; i < numElements; i++) { - reader.next(intWritable, txt); - Assert.assertEquals(i, intWritable.get()); - Assert.assertEquals("message #" + i, txt.toString()); - } - - reader.close(); - inStream.close(); - } - - /** - * This tests {@link AvroKeyValueSinkWriter} - * with non-rolling output and with compression. - */ - @Test - public void testNonRollingAvroKeyValueWithCompressionWriter() throws Exception { - final String outPath = hdfsURI + "/avro-kv-no-comp-non-rolling-out"; - - final int numElements = 20; - - Map<String, String> properties = new HashMap<>(); - Schema keySchema = Schema.create(Schema.Type.INT); - Schema valueSchema = Schema.create(Schema.Type.STRING); - properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString()); - properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString()); - properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, String.valueOf(true)); - properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC); - - BucketingSink<Tuple2<Integer, String>> sink = new BucketingSink<Tuple2<Integer, String>>(outPath) - .setWriter(new AvroKeyValueSinkWriter<Integer, String>(properties)) - .setBucketer(new BasePathBucketer<Tuple2<Integer, String>>()) - .setPartPrefix(PART_PREFIX) - .setPendingPrefix("") - .setPendingSuffix(""); - - OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, Object> testHarness = - createTestSink(sink, 1, 0); - - testHarness.setProcessingTime(0L); - - testHarness.setup(); - testHarness.open(); - - for (int i = 0; i < numElements; i++) { - testHarness.processElement(new StreamRecord<>(Tuple2.of( - i, "message #" + Integer.toString(i) - ))); - } - - testHarness.close(); - - GenericData.setStringType(valueSchema, GenericData.StringType.String); - Schema elementSchema = AvroKeyValueSinkWriter.AvroKeyValue.getSchema(keySchema, valueSchema); - - FSDataInputStream inStream = dfs.open(new Path(outPath + "/" + PART_PREFIX + "-0-0")); - - SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<>(elementSchema); - DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(inStream, elementReader); - for (int i = 0; i < numElements; i++) { - AvroKeyValueSinkWriter.AvroKeyValue<Integer, String> wrappedEntry = - new AvroKeyValueSinkWriter.AvroKeyValue<>(dataFileStream.next()); - int key = wrappedEntry.getKey(); - Assert.assertEquals(i, key); - String value = wrappedEntry.getValue(); - Assert.assertEquals("message #" + i, value); - } - - dataFileStream.close(); - inStream.close(); - } - - /** - * This uses {@link DateTimeBucketer} to - * produce rolling files. We use {@link OneInputStreamOperatorTestHarness} to manually - * advance processing time. - */ - @Test - public void testDateTimeRollingStringWriter() throws Exception { - final int numElements = 20; - - final String outPath = hdfsURI + "/rolling-out"; - - BucketingSink<String> sink = new BucketingSink<String>(outPath) - .setBucketer(new DateTimeBucketer<String>("ss")) - .setPartPrefix(PART_PREFIX) - .setPendingPrefix("") - .setPendingSuffix(""); - - OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink, 1, 0); - - testHarness.setProcessingTime(0L); - - testHarness.setup(); - testHarness.open(); - - for (int i = 0; i < numElements; i++) { - // Every 5 elements, increase the clock time. We should end up with 5 elements per bucket. - if (i % 5 == 0) { - testHarness.setProcessingTime(i * 1000L); - } - testHarness.processElement(new StreamRecord<>("message #" + Integer.toString(i))); - } - - testHarness.close(); - - RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(outPath), true); - - // We should have 4 rolling files across 4 time intervals - int numFiles = 0; - while (files.hasNext()) { - LocatedFileStatus file = files.next(); - numFiles++; - if (file.getPath().toString().contains("rolling-out/00")) { - FSDataInputStream inStream = dfs.open(file.getPath()); - - BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); - - for (int i = 0; i < 5; i++) { - String line = br.readLine(); - Assert.assertEquals("message #" + i, line); - } - - inStream.close(); - } else if (file.getPath().toString().contains("rolling-out/05")) { - FSDataInputStream inStream = dfs.open(file.getPath()); - - BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); - - for (int i = 5; i < 10; i++) { - String line = br.readLine(); - Assert.assertEquals("message #" + i, line); - } - - inStream.close(); - } else if (file.getPath().toString().contains("rolling-out/10")) { - FSDataInputStream inStream = dfs.open(file.getPath()); - - BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); - - for (int i = 10; i < 15; i++) { - String line = br.readLine(); - Assert.assertEquals("message #" + i, line); - } - - inStream.close(); - } else if (file.getPath().toString().contains("rolling-out/15")) { - FSDataInputStream inStream = dfs.open(file.getPath()); - - BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); - - for (int i = 15; i < 20; i++) { - String line = br.readLine(); - Assert.assertEquals("message #" + i, line); - } - - inStream.close(); - } else { - Assert.fail("File " + file + " does not match any expected roll pattern."); - } - } - - Assert.assertEquals(4, numFiles); - } - - /** - * This uses a custom bucketing function which determines the bucket from the input. - */ - @Test - public void testCustomBucketing() throws Exception { - File dataDir = tempFolder.newFolder(); - - final int numIds = 4; - final int numElements = 20; - - OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir, 1, 0); - - testHarness.setProcessingTime(0L); - - testHarness.setup(); - testHarness.open(); - - for (int i = 0; i < numElements; i++) { - testHarness.processElement(new StreamRecord<>(Integer.toString(i % numIds))); - } - - testHarness.close(); - - // we should have 4 buckets, with 1 file each - int numFiles = 0; - for (File file: FileUtils.listFiles(dataDir, null, true)) { - if (file.getName().startsWith(PART_PREFIX)) { - numFiles++; - } - } - - Assert.assertEquals(4, numFiles); - } - - /** - * This uses a custom bucketing function which determines the bucket from the input. - * We use a simulated clock to reduce the number of buckets being written to over time. - * This causes buckets to become 'inactive' and their file parts 'closed' by the sink. - */ - @Test - public void testCustomBucketingInactiveBucketCleanup() throws Exception { - File dataDir = tempFolder.newFolder(); - - final int step1NumIds = 4; - final int step2NumIds = 2; - final int numElementsPerStep = 20; - - OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir, 1, 0); - - testHarness.setProcessingTime(0L); - - testHarness.setup(); - testHarness.open(); - - for (int i = 0; i < numElementsPerStep; i++) { - testHarness.processElement(new StreamRecord<>(Integer.toString(i % step1NumIds))); - } - - testHarness.setProcessingTime(2*60*1000L); - - for (int i = 0; i < numElementsPerStep; i++) { - testHarness.processElement(new StreamRecord<>(Integer.toString(i % step2NumIds))); - } - - testHarness.setProcessingTime(6*60*1000L); - - for (int i = 0; i < numElementsPerStep; i++) { - testHarness.processElement(new StreamRecord<>(Integer.toString(i % step2NumIds))); - } - - // we should have 4 buckets, with 1 file each - // 2 of these buckets should have been finalised due to becoming inactive - int numFiles = 0; - int numInProgress = 0; - for (File file: FileUtils.listFiles(dataDir, null, true)) { - if (file.getAbsolutePath().endsWith("crc")) { - continue; - } - if (file.getPath().endsWith(IN_PROGRESS_SUFFIX)) { - numInProgress++; - } - numFiles++; - } - - testHarness.close(); - - Assert.assertEquals(4, numFiles); - Assert.assertEquals(2, numInProgress); - } - - /** - * This tests user defined hdfs configuration - * @throws Exception - */ - @Test - public void testUserDefinedConfiguration() throws Exception { - final String outPath = hdfsURI + "/string-non-rolling-with-config"; - final int numElements = 20; - - Map<String, String> properties = new HashMap<>(); - Schema keySchema = Schema.create(Schema.Type.INT); - Schema valueSchema = Schema.create(Schema.Type.STRING); - properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString()); - properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString()); - properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, String.valueOf(true)); - properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC); - - Configuration conf = new Configuration(); - conf.set("io.file.buffer.size", "40960"); - - BucketingSink<Tuple2<Integer,String>> sink = new BucketingSink<Tuple2<Integer, String>>(outPath) - .setFSConfig(conf) - .setWriter(new StreamWriterWithConfigCheck<Integer, String>(properties, "io.file.buffer.size", "40960")) - .setBucketer(new BasePathBucketer<Tuple2<Integer,String>>()) - .setPartPrefix(PART_PREFIX) - .setPendingPrefix("") - .setPendingSuffix(""); - - OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, Object> testHarness = - createTestSink(sink, 1, 0); - - testHarness.setProcessingTime(0L); - - testHarness.setup(); - testHarness.open(); - - for (int i = 0; i < numElements; i++) { - testHarness.processElement(new StreamRecord<>(Tuple2.of( - i, "message #" + Integer.toString(i) - ))); - } - - testHarness.close(); - - GenericData.setStringType(valueSchema, GenericData.StringType.String); - Schema elementSchema = AvroKeyValueSinkWriter.AvroKeyValue.getSchema(keySchema, valueSchema); - - FSDataInputStream inStream = dfs.open(new Path(outPath + "/" + PART_PREFIX + "-0-0")); - - SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<>(elementSchema); - DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(inStream, elementReader); - for (int i = 0; i < numElements; i++) { - AvroKeyValueSinkWriter.AvroKeyValue<Integer, String> wrappedEntry = - new AvroKeyValueSinkWriter.AvroKeyValue<>(dataFileStream.next()); - int key = wrappedEntry.getKey(); - Assert.assertEquals(i, key); - String value = wrappedEntry.getValue(); - Assert.assertEquals("message #" + i, value); - } - - dataFileStream.close(); - inStream.close(); - } - - private static class StreamWriterWithConfigCheck<K, V> extends AvroKeyValueSinkWriter<K, V> { - private Map<String, String> properties; - private String key; - private String expect; - public StreamWriterWithConfigCheck(Map<String, String> properties, String key, String expect) { - super(properties); - this.properties = properties; - this.key = key; - this.expect = expect; - } - - @Override - public void open(FileSystem fs, Path path) throws IOException { - super.open(fs, path); - Assert.assertEquals(expect, fs.getConf().get(key)); - } - - @Override - public Writer<Tuple2<K, V>> duplicate() { - return new StreamWriterWithConfigCheck<>(properties, key, expect); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties deleted file mode 100644 index 5c22851..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,29 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=OFF, testlogger - -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger - -log4j.logger.org.apache.directory=OFF, testlogger \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml deleted file mode 100644 index 45b3b92..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml +++ /dev/null @@ -1,30 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<configuration> - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> - </encoder> - </appender> - - <root level="WARN"> - <appender-ref ref="STDOUT"/> - </root> - <logger name="org.apache.flink.streaming" level="WARN"/> -</configuration> \ No newline at end of file
