Repository: flink Updated Branches: refs/heads/master fd08ad2e7 -> e9b20ec21
[FLINK-4378] Allow Setting Custom Configuration in BucketingSink Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e9b20ec2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e9b20ec2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e9b20ec2 Branch: refs/heads/master Commit: e9b20ec21ddd5f7d1440bccfb8622dcb47443fef Parents: f8b162e Author: wenlong.lwl <[email protected]> Authored: Thu Oct 27 11:44:41 2016 +0800 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Oct 28 10:56:24 2016 +0200 ---------------------------------------------------------------------- .../connectors/fs/bucketing/BucketingSink.java | 63 ++++++++++++-- .../fs/bucketing/BucketingSinkTest.java | 90 ++++++++++++++++++++ 2 files changed, 144 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e9b20ec2/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index 52de00d..0791fb5 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -281,7 +281,15 @@ public class BucketingSink<T> */ private transient State<T> state; - private transient org.apache.hadoop.conf.Configuration hadoopConf; + /** + * User-defined FileSystem parameters + */ + private Configuration fsConfig = null; + + /** + * The FileSystem reference. + */ + private transient FileSystem fs; private transient Clock clock; @@ -302,6 +310,28 @@ public class BucketingSink<T> this.writerTemplate = new StringWriter<>(); } + /** + * Specify a custom {@code Configuration} that will be used when creating + * the {@link FileSystem} for writing. + */ + public BucketingSink<T> setFSConfig(Configuration config) { + this.fsConfig = new Configuration(); + fsConfig.addAll(config); + return this; + } + + /** + * Specify a custom {@code Configuration} that will be used when creating + * the {@link FileSystem} for writing. + */ + public BucketingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) { + this.fsConfig = new Configuration(); + for(Map.Entry<String, String> entry : config) { + fsConfig.setString(entry.getKey(), entry.getValue()); + }; + return this; + } + @Override @SuppressWarnings("unchecked") public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { @@ -319,8 +349,7 @@ public class BucketingSink<T> state = new State<T>(); Path baseDirectory = new Path(basePath); - hadoopConf = HadoopFileSystem.getHadoopConfiguration(); - FileSystem fs = baseDirectory.getFileSystem(hadoopConf); + initFileSystem(); refTruncate = reflectTruncate(fs); processingTimeService = @@ -369,6 +398,27 @@ public class BucketingSink<T> } } + /** + * create a file system with the user defined hdfs config + * @throws IOException + */ + private void initFileSystem() throws IOException { + if(fs != null) { + return; + } + org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration(); + if(fsConfig != null) { + String disableCacheName + = String.format("fs.%s.impl.disable.cache", new Object[]{new Path(basePath).toUri().getScheme()}); + hadoopConf.setBoolean(disableCacheName, true); + for (String key : fsConfig.keySet()) { + hadoopConf.set(key, fsConfig.getString(key, null)); + } + } + + fs = new Path(basePath).getFileSystem(hadoopConf); + } + @Override public void close() throws Exception { for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) { @@ -456,8 +506,6 @@ public class BucketingSink<T> private void openNewPartFile(Path bucketPath, BucketState<T> bucketState) throws Exception { closeCurrentPartFile(bucketState); - FileSystem fs = new Path(basePath).getFileSystem(hadoopConf); - if (!fs.exists(bucketPath)) { try { if (fs.mkdirs(bucketPath)) { @@ -511,7 +559,6 @@ public class BucketingSink<T> Path currentPartPath = new Path(bucketState.currentFile); Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix); Path pendingPath = new Path(currentPartPath.getParent(), pendingPrefix + currentPartPath.getName()).suffix(pendingSuffix); - FileSystem fs = inProgressPath.getFileSystem(hadoopConf); fs.rename(inProgressPath, pendingPath); LOG.debug("Moving in-progress bucket {} to pending file {}", inProgressPath, @@ -589,7 +636,6 @@ public class BucketingSink<T> Path pendingPath = new Path(finalPath.getParent(), pendingPrefix + finalPath.getName()).suffix(pendingSuffix); - FileSystem fs = pendingPath.getFileSystem(hadoopConf); fs.rename(pendingPath, finalPath); LOG.debug( "Moving pending file {} to final location having completed checkpoint {}.", @@ -634,9 +680,8 @@ public class BucketingSink<T> public void restoreState(State<T> state) { this.state = state; - FileSystem fs; try { - fs = new Path(basePath).getFileSystem(HadoopFileSystem.getHadoopConfiguration()); + initFileSystem(); } catch (IOException e) { LOG.error("Error while creating FileSystem in checkpoint restore.", e); throw new RuntimeException("Error while creating FileSystem in checkpoint restore.", e); http://git-wip-us.apache.org/repos/asf/flink/blob/e9b20ec2/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java index e4b0460..992c8c2 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java @@ -34,11 +34,13 @@ import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter; import org.apache.flink.streaming.connectors.fs.Clock; import org.apache.flink.streaming.connectors.fs.SequenceFileWriter; import org.apache.flink.streaming.connectors.fs.StringWriter; +import org.apache.flink.streaming.connectors.fs.Writer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.NetUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; @@ -497,4 +499,92 @@ public class BucketingSinkTest { Assert.assertEquals(4, numFiles); Assert.assertEquals(2, numInProgress); } + + /** + * This tests user defined hdfs configuration + * @throws Exception + */ + @Test + public void testUserDefinedConfiguration() throws Exception { + final String outPath = hdfsURI + "/string-non-rolling-with-config"; + final int numElements = 20; + + Map<String, String> properties = new HashMap<>(); + Schema keySchema = Schema.create(Schema.Type.INT); + Schema valueSchema = Schema.create(Schema.Type.STRING); + properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString()); + properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString()); + properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, String.valueOf(true)); + properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC); + + Configuration conf = new Configuration(); + conf.set("io.file.buffer.size", "40960"); + + BucketingSink<Tuple2<Integer,String>> sink = new BucketingSink<Tuple2<Integer, String>>(outPath) + .setFSConfig(conf) + .setWriter(new StreamWriterWithConfigCheck<Integer, String>(properties, "io.file.buffer.size", "40960")) + .setBucketer(new BasePathBucketer<Tuple2<Integer,String>>()) + .setPartPrefix("part") + .setPendingPrefix("") + .setPendingSuffix(""); + + OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, Object> testHarness = + createTestSink(sink); + + testHarness.setProcessingTime(0L); + + testHarness.setup(); + testHarness.open(); + + for (int i = 0; i < numElements; i++) { + testHarness.processElement(new StreamRecord<>(Tuple2.of( + i, "message #" + Integer.toString(i) + ))); + } + + testHarness.close(); + + GenericData.setStringType(valueSchema, GenericData.StringType.String); + Schema elementSchema = AvroKeyValueSinkWriter.AvroKeyValue.getSchema(keySchema, valueSchema); + + FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0")); + + SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<>(elementSchema); + DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(inStream, elementReader); + for (int i = 0; i < numElements; i++) { + AvroKeyValueSinkWriter.AvroKeyValue<Integer, String> wrappedEntry = + new AvroKeyValueSinkWriter.AvroKeyValue<>(dataFileStream.next()); + int key = wrappedEntry.getKey(); + Assert.assertEquals(i, key); + String value = wrappedEntry.getValue(); + Assert.assertEquals("message #" + i, value); + } + + dataFileStream.close(); + inStream.close(); + } + + private static class StreamWriterWithConfigCheck<K, V> extends AvroKeyValueSinkWriter<K, V> { + private Map<String, String> properties; + private String key; + private String expect; + public StreamWriterWithConfigCheck(Map<String, String> properties, String key, String expect) { + super(properties); + this.properties = properties; + this.key = key; + this.expect = expect; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + super.open(fs, path); + Assert.assertEquals(expect, fs.getConf().get(key)); + } + + @Override + public Writer<Tuple2<K, V>> duplicate() { + return new StreamWriterWithConfigCheck<>(properties, key, expect); + } + } + }
