[FLINK-4378] Allow Setting Custom Configuration in RollingSink

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f8b162e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f8b162e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f8b162e8

Branch: refs/heads/master
Commit: f8b162e8f36373e16916798e3cc899c190561967
Parents: fd08ad2
Author: wenlong.lwl <[email protected]>
Authored: Thu Aug 11 21:26:01 2016 +0800
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Oct 28 10:56:24 2016 +0200

----------------------------------------------------------------------
 .../streaming/connectors/fs/RollingSink.java    | 67 ++++++++++++---
 .../connectors/fs/RollingSinkITCase.java        | 88 +++++++++++++++++++-
 2 files changed, 139 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f8b162e8/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index 738857f..b959bf8 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -284,9 +284,16 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
         * current part file path, the valid length of the in-progress files 
and pending part files.
         */
        private transient BucketState bucketState;
-       
-       private transient org.apache.hadoop.conf.Configuration hadoopConf;
-       
+
+       /**
+        * User-defined FileSystem parameters.
+     */
+       private Configuration fsConfig = null;
+
+       /**
+        * The FileSystem reference.
+        */
+       private transient FileSystem fs;
        /**
         * Creates a new {@code RollingSink} that writes files to the given 
base directory.
         *
@@ -303,6 +310,28 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                this.writerTemplate = new StringWriter<>();
        }
 
+       /**
+        * Specify a custom {@code Configuration} that will be used when 
creating
+        * the {@link FileSystem} for writing.
+        */
+       public RollingSink<T> setFSConfig(Configuration config) {
+               this.fsConfig = new Configuration();
+               fsConfig.addAll(config);
+               return this;
+       }
+
+       /**
+        * Specify a custom {@code Configuration} that will be used when 
creating
+        * the {@link FileSystem} for writing.
+        */
+       public RollingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration 
config) {
+               this.fsConfig = new Configuration();
+               for(Map.Entry<String, String> entry : config) {
+                       fsConfig.setString(entry.getKey(), entry.getValue());
+               };
+               return this;
+       }
+
        @Override
        @SuppressWarnings("unchecked")
        public void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig) {
@@ -324,8 +353,7 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                        bucketState = new BucketState();
                }
 
-               hadoopConf = HadoopFileSystem.getHadoopConfiguration();
-               FileSystem fs = new Path(basePath).getFileSystem(hadoopConf);
+               initFileSystem();
                refTruncate = reflectTruncate(fs);
 
                // delete pending/in-progress files that might be left if we 
fail while
@@ -358,6 +386,27 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                }
        }
 
+       /**
+        * create a file system with the user defined hdfs config
+        * @throws IOException
+        */
+       private void initFileSystem() throws IOException {
+               if(fs != null) {
+                       return;
+               }
+               org.apache.hadoop.conf.Configuration hadoopConf = 
HadoopFileSystem.getHadoopConfiguration();
+               if(fsConfig != null) {
+                       String disableCacheName
+                               = String.format("fs.%s.impl.disable.cache", new 
Object[]{new Path(basePath).toUri().getScheme()});
+                       hadoopConf.setBoolean(disableCacheName, true);
+                       for (String key : fsConfig.keySet()) {
+                               hadoopConf.set(key, fsConfig.getString(key, 
null));
+                       }
+               }
+
+               fs = new Path(basePath).getFileSystem(hadoopConf);
+       }
+
        @Override
        public void close() throws Exception {
 //             boolean interrupted = Thread.interrupted();
@@ -420,8 +469,6 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
        private void openNewPartFile() throws Exception {
                closeCurrentPartFile();
 
-               FileSystem fs = new Path(basePath).getFileSystem(hadoopConf);
-
                Path newBucketDirectory = bucketer.getNextBucketPath(new 
Path(basePath));
 
                if (!newBucketDirectory.equals(currentBucketDirectory)) {
@@ -451,7 +498,6 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                LOG.debug("Next part path is {}", currentPartPath.toString());
 
                Path inProgressPath = new Path(currentPartPath.getParent(), 
inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix);
-
                writer.open(fs, inProgressPath);
                isWriterOpen = true;
        }
@@ -472,7 +518,6 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                if (currentPartPath != null) {
                        Path inProgressPath = new 
Path(currentPartPath.getParent(), inProgressPrefix + 
currentPartPath.getName()).suffix(inProgressSuffix);
                        Path pendingPath = new 
Path(currentPartPath.getParent(), pendingPrefix + 
currentPartPath.getName()).suffix(pendingSuffix);
-                       FileSystem fs = 
inProgressPath.getFileSystem(hadoopConf);
                        fs.rename(inProgressPath, pendingPath);
                        LOG.debug("Moving in-progress bucket {} to pending file 
{}",
                                        inProgressPath,
@@ -547,7 +592,6 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                                                Path pendingPath = new 
Path(finalPath.getParent(),
                                                                pendingPrefix + 
finalPath.getName()).suffix(pendingSuffix);
 
-                                               FileSystem fs = 
pendingPath.getFileSystem(hadoopConf);
                                                fs.rename(pendingPath, 
finalPath);
                                                LOG.debug(
                                                                "Moving pending 
file {} to final location after complete checkpoint {}.",
@@ -583,9 +627,8 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                // we can clean all the pending files since they where renamed 
to final files
                // after this checkpoint was successfull
                bucketState.pendingFiles.clear();
-               FileSystem fs = null;
                try {
-                       fs = new 
Path(basePath).getFileSystem(HadoopFileSystem.getHadoopConfiguration());
+                       initFileSystem();
                } catch (IOException e) {
                        LOG.error("Error while creating FileSystem in 
checkpoint restore.", e);
                        throw new RuntimeException("Error while creating 
FileSystem in checkpoint restore.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/f8b162e8/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index c3c8df5..c8440ef 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -21,12 +21,10 @@ package org.apache.flink.streaming.connectors.fs;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.file.DataFileConstants;
-import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericData.StringType;
-import org.apache.avro.io.DatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFilterFunction;
@@ -42,11 +40,11 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.NetUtils;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -457,7 +455,68 @@ public class RollingSinkITCase extends 
StreamingMultipleProgramsTestBase {
                dataFileStream.close();
                inStream.close();
        }
-       
+
+
+       /**
+        * This tests user defined hdfs configuration
+        * @throws Exception
+     */
+       @Test
+       public void testUserDefinedConfiguration() throws Exception {
+               final int NUM_ELEMENTS = 20;
+               final int PARALLELISM = 2;
+               final String outPath = hdfsURI + 
"/string-non-rolling-with-config";
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(PARALLELISM);
+
+               DataStream<Tuple2<Integer, String>> source = env.addSource(new 
TestSourceFunction(NUM_ELEMENTS))
+                       .broadcast()
+                       .filter(new OddEvenFilter());
+
+               Configuration conf = new Configuration();
+               conf.set("io.file.buffer.size", "40960");
+               RollingSink<String> sink = new RollingSink<String>(outPath)
+                       .setFSConfig(conf)
+                       .setWriter(new 
StreamWriterWithConfigCheck<String>("io.file.buffer.size", "40960"))
+                       .setBucketer(new NonRollingBucketer())
+                       .setPartPrefix("part")
+                       .setPendingPrefix("")
+                       .setPendingSuffix("");
+
+               source
+                       .map(new MapFunction<Tuple2<Integer,String>, String>() {
+                               private static final long serialVersionUID = 1L;
+                               @Override
+                               public String map(Tuple2<Integer, String> 
value) throws Exception {
+                                       return value.f1;
+                               }
+                       })
+                       .addSink(sink);
+
+               env.execute("RollingSink with configuration Test");
+
+               FSDataInputStream inStream = dfs.open(new Path(outPath + 
"/part-0-0"));
+
+               BufferedReader br = new BufferedReader(new 
InputStreamReader(inStream));
+
+               for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+                       String line = br.readLine();
+                       Assert.assertEquals("message #" + i, line);
+               }
+
+               inStream.close();
+
+               inStream = dfs.open(new Path(outPath + "/part-1-0"));
+
+               br = new BufferedReader(new InputStreamReader(inStream));
+
+               for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+                       String line = br.readLine();
+                       Assert.assertEquals("message #" + i, line);
+               }
+
+               inStream.close();
+       }
 
        // we use this to synchronize the clock changes to elements being 
processed
        final static MultiShotLatch latch1 = new MultiShotLatch();
@@ -639,6 +698,27 @@ public class RollingSinkITCase extends 
StreamingMultipleProgramsTestBase {
                }
        }
 
+
+       private static class StreamWriterWithConfigCheck<T> extends 
StringWriter<T> {
+               private String key;
+               private String expect;
+               public StreamWriterWithConfigCheck(String key, String expect) {
+                       this.key = key;
+                       this.expect = expect;
+               }
+
+               @Override
+               public void open(FileSystem fs, Path path) throws IOException {
+                       super.open(fs, path);
+                       Assert.assertEquals(expect, fs.getConf().get(key));
+               }
+
+               @Override
+               public Writer<T> duplicate() {
+                       return new StreamWriterWithConfigCheck<>(key, expect);
+               }
+       }
+
        public static class OddEvenFilter extends 
RichFilterFunction<Tuple2<Integer, String>> {
                private static final long serialVersionUID = 1L;
 

Reply via email to