Repository: flink
Updated Branches:
  refs/heads/master fd08ad2e7 -> e9b20ec21


[FLINK-4378] Allow Setting Custom Configuration in BucketingSink


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e9b20ec2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e9b20ec2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e9b20ec2

Branch: refs/heads/master
Commit: e9b20ec21ddd5f7d1440bccfb8622dcb47443fef
Parents: f8b162e
Author: wenlong.lwl <[email protected]>
Authored: Thu Oct 27 11:44:41 2016 +0800
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Oct 28 10:56:24 2016 +0200

----------------------------------------------------------------------
 .../connectors/fs/bucketing/BucketingSink.java  | 63 ++++++++++++--
 .../fs/bucketing/BucketingSinkTest.java         | 90 ++++++++++++++++++++
 2 files changed, 144 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e9b20ec2/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 52de00d..0791fb5 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -281,7 +281,15 @@ public class BucketingSink<T>
         */
        private transient State<T> state;
 
-       private transient org.apache.hadoop.conf.Configuration hadoopConf;
+       /**
+        * User-defined FileSystem parameters
+        */
+       private Configuration fsConfig = null;
+
+       /**
+        * The FileSystem reference.
+        */
+       private transient FileSystem fs;
 
        private transient Clock clock;
 
@@ -302,6 +310,28 @@ public class BucketingSink<T>
                this.writerTemplate = new StringWriter<>();
        }
 
+       /**
+        * Specify a custom {@code Configuration} that will be used when 
creating
+        * the {@link FileSystem} for writing.
+        */
+       public BucketingSink<T> setFSConfig(Configuration config) {
+               this.fsConfig = new Configuration();
+               fsConfig.addAll(config);
+               return this;
+       }
+
+       /**
+        * Specify a custom {@code Configuration} that will be used when 
creating
+        * the {@link FileSystem} for writing.
+        */
+       public BucketingSink<T> 
setFSConfig(org.apache.hadoop.conf.Configuration config) {
+               this.fsConfig = new Configuration();
+               for(Map.Entry<String, String> entry : config) {
+                       fsConfig.setString(entry.getKey(), entry.getValue());
+               };
+               return this;
+       }
+
        @Override
        @SuppressWarnings("unchecked")
        public void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig) {
@@ -319,8 +349,7 @@ public class BucketingSink<T>
                state = new State<T>();
 
                Path baseDirectory = new Path(basePath);
-               hadoopConf = HadoopFileSystem.getHadoopConfiguration();
-               FileSystem fs = baseDirectory.getFileSystem(hadoopConf);
+               initFileSystem();
                refTruncate = reflectTruncate(fs);
 
                processingTimeService =
@@ -369,6 +398,27 @@ public class BucketingSink<T>
                }
        }
 
+       /**
+        * create a file system with the user defined hdfs config
+        * @throws IOException
+        */
+       private void initFileSystem() throws IOException {
+               if(fs != null) {
+                       return;
+               }
+               org.apache.hadoop.conf.Configuration hadoopConf = 
HadoopFileSystem.getHadoopConfiguration();
+               if(fsConfig != null) {
+                       String disableCacheName
+                               = String.format("fs.%s.impl.disable.cache", new 
Object[]{new Path(basePath).toUri().getScheme()});
+                       hadoopConf.setBoolean(disableCacheName, true);
+                       for (String key : fsConfig.keySet()) {
+                               hadoopConf.set(key, fsConfig.getString(key, 
null));
+                       }
+               }
+
+               fs = new Path(basePath).getFileSystem(hadoopConf);
+       }
+
        @Override
        public void close() throws Exception {
                for (Map.Entry<String, BucketState<T>> entry : 
state.bucketStates.entrySet()) {
@@ -456,8 +506,6 @@ public class BucketingSink<T>
        private void openNewPartFile(Path bucketPath, BucketState<T> 
bucketState) throws Exception {
                closeCurrentPartFile(bucketState);
 
-               FileSystem fs = new Path(basePath).getFileSystem(hadoopConf);
-
                if (!fs.exists(bucketPath)) {
                        try {
                                if (fs.mkdirs(bucketPath)) {
@@ -511,7 +559,6 @@ public class BucketingSink<T>
                        Path currentPartPath = new 
Path(bucketState.currentFile);
                        Path inProgressPath = new 
Path(currentPartPath.getParent(), inProgressPrefix + 
currentPartPath.getName()).suffix(inProgressSuffix);
                        Path pendingPath = new 
Path(currentPartPath.getParent(), pendingPrefix + 
currentPartPath.getName()).suffix(pendingSuffix);
-                       FileSystem fs = 
inProgressPath.getFileSystem(hadoopConf);
                        fs.rename(inProgressPath, pendingPath);
                        LOG.debug("Moving in-progress bucket {} to pending file 
{}",
                                inProgressPath,
@@ -589,7 +636,6 @@ public class BucketingSink<T>
                                                                Path 
pendingPath = new Path(finalPath.getParent(),
                                                                        
pendingPrefix + finalPath.getName()).suffix(pendingSuffix);
 
-                                                               FileSystem fs = 
pendingPath.getFileSystem(hadoopConf);
                                                                
fs.rename(pendingPath, finalPath);
                                                                LOG.debug(
                                                                        "Moving 
pending file {} to final location having completed checkpoint {}.",
@@ -634,9 +680,8 @@ public class BucketingSink<T>
        public void restoreState(State<T> state) {
                this.state = state;
 
-               FileSystem fs;
                try {
-                       fs = new 
Path(basePath).getFileSystem(HadoopFileSystem.getHadoopConfiguration());
+                       initFileSystem();
                } catch (IOException e) {
                        LOG.error("Error while creating FileSystem in 
checkpoint restore.", e);
                        throw new RuntimeException("Error while creating 
FileSystem in checkpoint restore.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/e9b20ec2/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index e4b0460..992c8c2 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -34,11 +34,13 @@ import 
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter;
 import org.apache.flink.streaming.connectors.fs.Clock;
 import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
 import org.apache.flink.streaming.connectors.fs.StringWriter;
+import org.apache.flink.streaming.connectors.fs.Writer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.NetUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -497,4 +499,92 @@ public class BucketingSinkTest {
                Assert.assertEquals(4, numFiles);
                Assert.assertEquals(2, numInProgress);
        }
+
+       /**
+        * This tests user defined hdfs configuration
+        * @throws Exception
+        */
+       @Test
+       public void testUserDefinedConfiguration() throws Exception {
+               final String outPath = hdfsURI + 
"/string-non-rolling-with-config";
+               final int numElements = 20;
+
+               Map<String, String> properties = new HashMap<>();
+               Schema keySchema = Schema.create(Schema.Type.INT);
+               Schema valueSchema = Schema.create(Schema.Type.STRING);
+               properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, 
keySchema.toString());
+               properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, 
valueSchema.toString());
+               properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, 
String.valueOf(true));
+               properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, 
DataFileConstants.SNAPPY_CODEC);
+
+               Configuration conf = new Configuration();
+               conf.set("io.file.buffer.size", "40960");
+
+               BucketingSink<Tuple2<Integer,String>> sink = new 
BucketingSink<Tuple2<Integer, String>>(outPath)
+                       .setFSConfig(conf)
+                       .setWriter(new StreamWriterWithConfigCheck<Integer, 
String>(properties, "io.file.buffer.size", "40960"))
+                       .setBucketer(new 
BasePathBucketer<Tuple2<Integer,String>>())
+                       .setPartPrefix("part")
+                       .setPendingPrefix("")
+                       .setPendingSuffix("");
+
+               OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, 
Object> testHarness =
+                       createTestSink(sink);
+
+               testHarness.setProcessingTime(0L);
+
+               testHarness.setup();
+               testHarness.open();
+
+               for (int i = 0; i < numElements; i++) {
+                       testHarness.processElement(new StreamRecord<>(Tuple2.of(
+                               i, "message #" + Integer.toString(i)
+                       )));
+               }
+
+               testHarness.close();
+
+               GenericData.setStringType(valueSchema, 
GenericData.StringType.String);
+               Schema elementSchema = 
AvroKeyValueSinkWriter.AvroKeyValue.getSchema(keySchema, valueSchema);
+
+               FSDataInputStream inStream = dfs.open(new Path(outPath + 
"/part-0-0"));
+
+               SpecificDatumReader<GenericRecord> elementReader = new 
SpecificDatumReader<>(elementSchema);
+               DataFileStream<GenericRecord> dataFileStream = new 
DataFileStream<>(inStream, elementReader);
+               for (int i = 0; i < numElements; i++) {
+                       AvroKeyValueSinkWriter.AvroKeyValue<Integer, String> 
wrappedEntry =
+                               new 
AvroKeyValueSinkWriter.AvroKeyValue<>(dataFileStream.next());
+                       int key = wrappedEntry.getKey();
+                       Assert.assertEquals(i, key);
+                       String value = wrappedEntry.getValue();
+                       Assert.assertEquals("message #" + i, value);
+               }
+
+               dataFileStream.close();
+               inStream.close();
+       }
+
+       private static class StreamWriterWithConfigCheck<K, V> extends 
AvroKeyValueSinkWriter<K, V> {
+               private Map<String, String> properties;
+               private String key;
+               private String expect;
+               public StreamWriterWithConfigCheck(Map<String, String> 
properties, String key, String expect) {
+                       super(properties);
+                       this.properties = properties;
+                       this.key = key;
+                       this.expect = expect;
+               }
+
+               @Override
+               public void open(FileSystem fs, Path path) throws IOException {
+                       super.open(fs, path);
+                       Assert.assertEquals(expect, fs.getConf().get(key));
+               }
+
+               @Override
+               public Writer<Tuple2<K, V>> duplicate() {
+                       return new StreamWriterWithConfigCheck<>(properties, 
key, expect);
+               }
+       }
+
 }

Reply via email to