[FLINK-4378] Allow Setting Custom Configuration in RollingSink
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f8b162e8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f8b162e8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f8b162e8 Branch: refs/heads/master Commit: f8b162e8f36373e16916798e3cc899c190561967 Parents: fd08ad2 Author: wenlong.lwl <[email protected]> Authored: Thu Aug 11 21:26:01 2016 +0800 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Oct 28 10:56:24 2016 +0200 ---------------------------------------------------------------------- .../streaming/connectors/fs/RollingSink.java | 67 ++++++++++++--- .../connectors/fs/RollingSinkITCase.java | 88 +++++++++++++++++++- 2 files changed, 139 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f8b162e8/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java index 738857f..b959bf8 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java @@ -284,9 +284,16 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf * current part file path, the valid length of the in-progress files and pending part files. */ private transient BucketState bucketState; - - private transient org.apache.hadoop.conf.Configuration hadoopConf; - + + /** + * User-defined FileSystem parameters. + */ + private Configuration fsConfig = null; + + /** + * The FileSystem reference. + */ + private transient FileSystem fs; /** * Creates a new {@code RollingSink} that writes files to the given base directory. * @@ -303,6 +310,28 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf this.writerTemplate = new StringWriter<>(); } + /** + * Specify a custom {@code Configuration} that will be used when creating + * the {@link FileSystem} for writing. + */ + public RollingSink<T> setFSConfig(Configuration config) { + this.fsConfig = new Configuration(); + fsConfig.addAll(config); + return this; + } + + /** + * Specify a custom {@code Configuration} that will be used when creating + * the {@link FileSystem} for writing. + */ + public RollingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) { + this.fsConfig = new Configuration(); + for(Map.Entry<String, String> entry : config) { + fsConfig.setString(entry.getKey(), entry.getValue()); + }; + return this; + } + @Override @SuppressWarnings("unchecked") public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { @@ -324,8 +353,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf bucketState = new BucketState(); } - hadoopConf = HadoopFileSystem.getHadoopConfiguration(); - FileSystem fs = new Path(basePath).getFileSystem(hadoopConf); + initFileSystem(); refTruncate = reflectTruncate(fs); // delete pending/in-progress files that might be left if we fail while @@ -358,6 +386,27 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf } } + /** + * create a file system with the user defined hdfs config + * @throws IOException + */ + private void initFileSystem() throws IOException { + if(fs != null) { + return; + } + org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration(); + if(fsConfig != null) { + String disableCacheName + = String.format("fs.%s.impl.disable.cache", new Object[]{new Path(basePath).toUri().getScheme()}); + hadoopConf.setBoolean(disableCacheName, true); + for (String key : fsConfig.keySet()) { + hadoopConf.set(key, fsConfig.getString(key, null)); + } + } + + fs = new Path(basePath).getFileSystem(hadoopConf); + } + @Override public void close() throws Exception { // boolean interrupted = Thread.interrupted(); @@ -420,8 +469,6 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf private void openNewPartFile() throws Exception { closeCurrentPartFile(); - FileSystem fs = new Path(basePath).getFileSystem(hadoopConf); - Path newBucketDirectory = bucketer.getNextBucketPath(new Path(basePath)); if (!newBucketDirectory.equals(currentBucketDirectory)) { @@ -451,7 +498,6 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf LOG.debug("Next part path is {}", currentPartPath.toString()); Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix); - writer.open(fs, inProgressPath); isWriterOpen = true; } @@ -472,7 +518,6 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf if (currentPartPath != null) { Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix); Path pendingPath = new Path(currentPartPath.getParent(), pendingPrefix + currentPartPath.getName()).suffix(pendingSuffix); - FileSystem fs = inProgressPath.getFileSystem(hadoopConf); fs.rename(inProgressPath, pendingPath); LOG.debug("Moving in-progress bucket {} to pending file {}", inProgressPath, @@ -547,7 +592,6 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf Path pendingPath = new Path(finalPath.getParent(), pendingPrefix + finalPath.getName()).suffix(pendingSuffix); - FileSystem fs = pendingPath.getFileSystem(hadoopConf); fs.rename(pendingPath, finalPath); LOG.debug( "Moving pending file {} to final location after complete checkpoint {}.", @@ -583,9 +627,8 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf // we can clean all the pending files since they where renamed to final files // after this checkpoint was successfull bucketState.pendingFiles.clear(); - FileSystem fs = null; try { - fs = new Path(basePath).getFileSystem(HadoopFileSystem.getHadoopConfiguration()); + initFileSystem(); } catch (IOException e) { LOG.error("Error while creating FileSystem in checkpoint restore.", e); throw new RuntimeException("Error while creating FileSystem in checkpoint restore.", e); http://git-wip-us.apache.org/repos/asf/flink/blob/f8b162e8/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java index c3c8df5..c8440ef 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java @@ -21,12 +21,10 @@ package org.apache.flink.streaming.connectors.fs; import org.apache.avro.Schema; import org.apache.avro.Schema.Type; import org.apache.avro.file.DataFileConstants; -import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericData.StringType; -import org.apache.avro.io.DatumReader; import org.apache.avro.specific.SpecificDatumReader; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFilterFunction; @@ -42,11 +40,11 @@ import org.apache.flink.util.Collector; import org.apache.flink.util.NetUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; @@ -457,7 +455,68 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase { dataFileStream.close(); inStream.close(); } - + + + /** + * This tests user defined hdfs configuration + * @throws Exception + */ + @Test + public void testUserDefinedConfiguration() throws Exception { + final int NUM_ELEMENTS = 20; + final int PARALLELISM = 2; + final String outPath = hdfsURI + "/string-non-rolling-with-config"; + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS)) + .broadcast() + .filter(new OddEvenFilter()); + + Configuration conf = new Configuration(); + conf.set("io.file.buffer.size", "40960"); + RollingSink<String> sink = new RollingSink<String>(outPath) + .setFSConfig(conf) + .setWriter(new StreamWriterWithConfigCheck<String>("io.file.buffer.size", "40960")) + .setBucketer(new NonRollingBucketer()) + .setPartPrefix("part") + .setPendingPrefix("") + .setPendingSuffix(""); + + source + .map(new MapFunction<Tuple2<Integer,String>, String>() { + private static final long serialVersionUID = 1L; + @Override + public String map(Tuple2<Integer, String> value) throws Exception { + return value.f1; + } + }) + .addSink(sink); + + env.execute("RollingSink with configuration Test"); + + FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0")); + + BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); + + for (int i = 0; i < NUM_ELEMENTS; i += 2) { + String line = br.readLine(); + Assert.assertEquals("message #" + i, line); + } + + inStream.close(); + + inStream = dfs.open(new Path(outPath + "/part-1-0")); + + br = new BufferedReader(new InputStreamReader(inStream)); + + for (int i = 1; i < NUM_ELEMENTS; i += 2) { + String line = br.readLine(); + Assert.assertEquals("message #" + i, line); + } + + inStream.close(); + } // we use this to synchronize the clock changes to elements being processed final static MultiShotLatch latch1 = new MultiShotLatch(); @@ -639,6 +698,27 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase { } } + + private static class StreamWriterWithConfigCheck<T> extends StringWriter<T> { + private String key; + private String expect; + public StreamWriterWithConfigCheck(String key, String expect) { + this.key = key; + this.expect = expect; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + super.open(fs, path); + Assert.assertEquals(expect, fs.getConf().get(key)); + } + + @Override + public Writer<T> duplicate() { + return new StreamWriterWithConfigCheck<>(key, expect); + } + } + public static class OddEvenFilter extends RichFilterFunction<Tuple2<Integer, String>> { private static final long serialVersionUID = 1L;
