Repository: flink Updated Branches: refs/heads/master b5e2e3637 -> c532638a0
[FLINK-3854] Support Avro key-value rolling sink writer This closes #1953 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c532638a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c532638a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c532638a Branch: refs/heads/master Commit: c532638a010d75f97184314ee86fd070168c6837 Parents: b5e2e36 Author: Igor Berman <[email protected]> Authored: Sat Apr 30 17:54:52 2016 +0300 Committer: Robert Metzger <[email protected]> Committed: Thu Jun 9 13:34:06 2016 +0200 ---------------------------------------------------------------------- .../connectors/fs/AvroKeyValueSinkWriter.java | 309 +++++++++++++++++++ .../connectors/fs/RollingSinkITCase.java | 147 +++++++++ 2 files changed, 456 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c532638a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java new file mode 100644 index 0000000..a8919c3 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java @@ -0,0 +1,309 @@ +package org.apache.flink.streaming.connectors.fs; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileConstants; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.InputTypeConfigurable; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +/** +* Implementation of AvroKeyValue writer that can be used in Sink. +* Each entry would be wrapped in GenericRecord with key/value fields(same as in m/r lib) +<pre> +Usage: +{@code + RollingSink<Tuple2<Long , Long>> sink = new RollingSink<Tuple2<Long , Long>>("/tmp/path"); + sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd/HH/mm/")); + sink.setPendingSuffix(".avro"); + Map<String,String> properties = new HashMap<>(); + Schema longSchema = Schema.create(Type.LONG); + String keySchema = longSchema.toString(); + String valueSchema = longSchema.toString(); + properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema); + properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema); + properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, Boolean.toString(true)); + properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC); + + sink.setWriter(new AvroSinkWriter<Long , Long>(properties)); + sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB, +} +</pre> +*/ +public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>> implements Writer<Tuple2<K, V>>, InputTypeConfigurable { + private static final long serialVersionUID = 1L; + public static final String CONF_OUTPUT_KEY_SCHEMA = "avro.schema.output.key"; + public static final String CONF_OUTPUT_VALUE_SCHEMA = "avro.schema.output.value"; + public static final String CONF_COMPRESS = FileOutputFormat.COMPRESS; + public static final String CONF_COMPRESS_CODEC = FileOutputFormat.COMPRESS_CODEC; + public static final String CONF_DEFLATE_LEVEL = "avro.deflate.level"; + public static final String CONF_XZ_LEVEL = "avro.xz.level"; + + private transient AvroKeyValueWriter<K, V> keyValueWriter; + + private final Map<String, String> properties; + + /** + * C'tor for the writer + * <p> + * You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above) + * @param properties + */ + @SuppressWarnings("deprecation") + public AvroKeyValueSinkWriter(Map<String, String> properties) { + this.properties = properties; + + String keySchemaString = properties.get(CONF_OUTPUT_KEY_SCHEMA); + if (keySchemaString == null) { + throw new IllegalStateException("No key schema provided, set '" + CONF_OUTPUT_KEY_SCHEMA + "' property"); + } + Schema.parse(keySchemaString);//verifying that schema valid + + String valueSchemaString = properties.get(CONF_OUTPUT_VALUE_SCHEMA); + if (valueSchemaString == null) { + throw new IllegalStateException("No value schema provided, set '" + CONF_OUTPUT_VALUE_SCHEMA + "' property"); + } + Schema.parse(valueSchemaString);//verifying that schema valid + } + + private boolean getBoolean(Map<String,String> conf, String key, boolean def) { + String value = conf.get(key); + if (value == null) { + return def; + } + return Boolean.parseBoolean(value); + } + + private int getInt(Map<String,String> conf, String key, int def) { + String value = conf.get(key); + if (value == null) { + return def; + } + return Integer.parseInt(value); + } + + //this derived from AvroOutputFormatBase.getCompressionCodec(..) + private CodecFactory getCompressionCodec(Map<String,String> conf) { + if (getBoolean(conf, CONF_COMPRESS, false)) { + int deflateLevel = getInt(conf, CONF_DEFLATE_LEVEL, CodecFactory.DEFAULT_DEFLATE_LEVEL); + int xzLevel = getInt(conf, CONF_XZ_LEVEL, CodecFactory.DEFAULT_XZ_LEVEL); + + String outputCodec = conf.get(CONF_COMPRESS_CODEC); + + if (DataFileConstants.DEFLATE_CODEC.equals(outputCodec)) { + return CodecFactory.deflateCodec(deflateLevel); + } else if (DataFileConstants.XZ_CODEC.equals(outputCodec)) { + return CodecFactory.xzCodec(xzLevel); + } else { + return CodecFactory.fromString(outputCodec); + } + } + return CodecFactory.nullCodec(); + } + + @Override + @SuppressWarnings("deprecation") + public void open(FileSystem fs, Path path) throws IOException { + super.open(fs, path); + + CodecFactory compressionCodec = getCompressionCodec(properties); + Schema keySchema = Schema.parse(properties.get(CONF_OUTPUT_KEY_SCHEMA)); + Schema valueSchema = Schema.parse(properties.get(CONF_OUTPUT_VALUE_SCHEMA)); + keyValueWriter = new AvroKeyValueWriter<K, V>(keySchema, valueSchema, compressionCodec, getStream()); + } + + @Override + public void close() throws IOException { + super.close();//the order is important since super.close flushes inside + if (keyValueWriter != null) { + keyValueWriter.close(); + } + } + + @Override + public long flush() throws IOException { + if (keyValueWriter != null) { + keyValueWriter.sync(); + } + return super.flush(); + } + + @Override + public void write(Tuple2<K, V> element) throws IOException { + getStream(); // Throws if the stream is not open + keyValueWriter.write(element.f0, element.f1); + } + + @Override + public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { + if (!type.isTupleType()) { + throw new IllegalArgumentException("Input TypeInformation is not a tuple type."); + } + + TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>) type; + + if (tupleType.getArity() != 2) { + throw new IllegalArgumentException("Input TypeInformation must be a Tuple2 type."); + } + } + + @Override + public Writer<Tuple2<K, V>> duplicate() { + return new AvroKeyValueSinkWriter<K, V>(properties); + } + + // taken from m/r avro lib to remove dependency on it + private static final class AvroKeyValueWriter<K, V> { + /** A writer for the Avro container file. */ + private final DataFileWriter<GenericRecord> mAvroFileWriter; + + /** + * The writer schema for the generic record entries of the Avro + * container file. + */ + private final Schema mKeyValuePairSchema; + + /** + * A reusable Avro generic record for writing key/value pairs to the + * file. + */ + private final AvroKeyValue<Object, Object> mOutputRecord; + + AvroKeyValueWriter(Schema keySchema, Schema valueSchema, + CodecFactory compressionCodec, OutputStream outputStream, + int syncInterval) throws IOException { + // Create the generic record schema for the key/value pair. + mKeyValuePairSchema = AvroKeyValue + .getSchema(keySchema, valueSchema); + + // Create an Avro container file and a writer to it. + DatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>( + mKeyValuePairSchema); + mAvroFileWriter = new DataFileWriter<GenericRecord>( + genericDatumWriter); + mAvroFileWriter.setCodec(compressionCodec); + mAvroFileWriter.setSyncInterval(syncInterval); + mAvroFileWriter.create(mKeyValuePairSchema, outputStream); + + // Create a reusable output record. + mOutputRecord = new AvroKeyValue<Object, Object>( + new GenericData.Record(mKeyValuePairSchema)); + } + + AvroKeyValueWriter(Schema keySchema, Schema valueSchema, + CodecFactory compressionCodec, OutputStream outputStream) + throws IOException { + this(keySchema, valueSchema, compressionCodec, outputStream, + DataFileConstants.DEFAULT_SYNC_INTERVAL); + } + + void write(K key, V value) throws IOException { + mOutputRecord.setKey(key); + mOutputRecord.setValue(value); + mAvroFileWriter.append(mOutputRecord.get()); + } + + void close() throws IOException { + mAvroFileWriter.close(); + } + + long sync() throws IOException { + return mAvroFileWriter.sync(); + } + } + + // taken from AvroKeyValue avro-mapr lib + public static class AvroKeyValue<K, V> { + /** The name of the key value pair generic record. */ + public static final String KEY_VALUE_PAIR_RECORD_NAME = "KeyValuePair"; + + /** The namespace of the key value pair generic record. */ + public static final String KEY_VALUE_PAIR_RECORD_NAMESPACE = "org.apache.avro.mapreduce"; + + /** The name of the generic record field containing the key. */ + public static final String KEY_FIELD = "key"; + + /** The name of the generic record field containing the value. */ + public static final String VALUE_FIELD = "value"; + + /** The key/value generic record wrapped by this class. */ + public final GenericRecord mKeyValueRecord; + + /** + * Wraps a GenericRecord that is a key value pair. + */ + public AvroKeyValue(GenericRecord keyValueRecord) { + mKeyValueRecord = keyValueRecord; + } + + public GenericRecord get() { + return mKeyValueRecord; + } + + public void setKey(K key) { + mKeyValueRecord.put(KEY_FIELD, key); + } + + public void setValue(V value) { + mKeyValueRecord.put(VALUE_FIELD, value); + } + + @SuppressWarnings("unchecked") + public K getKey() { + return (K) mKeyValueRecord.get(KEY_FIELD); + } + + @SuppressWarnings("unchecked") + public V getValue() { + return (V) mKeyValueRecord.get(VALUE_FIELD); + } + + /** + * Creates a KeyValuePair generic record schema. + * + * @return A schema for a generic record with two fields: 'key' and + * 'value'. + */ + public static Schema getSchema(Schema keySchema, Schema valueSchema) { + Schema schema = Schema.createRecord(KEY_VALUE_PAIR_RECORD_NAME, + "A key/value pair", KEY_VALUE_PAIR_RECORD_NAMESPACE, false); + schema.setFields(Arrays.asList(new Schema.Field(KEY_FIELD, + keySchema, "The key", null), new Schema.Field(VALUE_FIELD, + valueSchema, "The value", null))); + return schema; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c532638a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java index 9770f41..b6c5bdb 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java @@ -17,6 +17,16 @@ */ package org.apache.flink.streaming.connectors.fs; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; +import org.apache.avro.file.DataFileConstants; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericData.StringType; +import org.apache.avro.io.DatumReader; +import org.apache.avro.specific.SpecificDatumReader; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; @@ -25,6 +35,7 @@ import org.apache.flink.runtime.taskmanager.MultiShotLatch; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.AvroKeyValue; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.util.Collector; import org.apache.flink.util.NetUtils; @@ -48,6 +59,8 @@ import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; /** * Tests for {@link RollingSink}. These @@ -296,6 +309,140 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase { reader.close(); inStream.close(); } + + + /** + * This tests {@link AvroKeyValueSinkWriter} + * with non-rolling output and without compression. + */ + @Test + public void testNonRollingAvroKeyValueWithoutCompressionWriter() throws Exception { + final int NUM_ELEMENTS = 20; + final int PARALLELISM = 2; + final String outPath = hdfsURI + "/avro-kv-no-comp-non-rolling-out"; + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS)) + .broadcast() + .filter(new OddEvenFilter()); + + + Map<String, String> properties = new HashMap<>(); + Schema keySchema = Schema.create(Type.INT); + Schema valueSchema = Schema.create(Type.STRING); + properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString()); + properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString()); + RollingSink<Tuple2<Integer, String>> sink = new RollingSink<Tuple2<Integer, String>>(outPath) + .setWriter(new AvroKeyValueSinkWriter<Integer, String>(properties)) + .setBucketer(new NonRollingBucketer()) + .setPartPrefix("part") + .setPendingPrefix("") + .setPendingSuffix(""); + + source.addSink(sink); + + env.execute("RollingSink Avro KeyValue Writer Test"); + + GenericData.setStringType(valueSchema, StringType.String); + Schema elementSchema = AvroKeyValue.getSchema(keySchema, valueSchema); + + FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0")); + SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<GenericRecord>(elementSchema); + DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader); + for (int i = 0; i < NUM_ELEMENTS; i += 2) { + AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next()); + int key = wrappedEntry.getKey().intValue(); + Assert.assertEquals(i, key); + String value = wrappedEntry.getValue(); + Assert.assertEquals("message #" + i, value); + } + + dataFileStream.close(); + inStream.close(); + + inStream = dfs.open(new Path(outPath + "/part-1-0")); + dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader); + + for (int i = 1; i < NUM_ELEMENTS; i += 2) { + AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next()); + int key = wrappedEntry.getKey().intValue(); + Assert.assertEquals(i, key); + String value = wrappedEntry.getValue(); + Assert.assertEquals("message #" + i, value); + } + + dataFileStream.close(); + inStream.close(); + } + + /** + * This tests {@link AvroKeyValueSinkWriter} + * with non-rolling output and with compression. + */ + @Test + public void testNonRollingAvroKeyValueWithCompressionWriter() throws Exception { + final int NUM_ELEMENTS = 20; + final int PARALLELISM = 2; + final String outPath = hdfsURI + "/avro-kv-no-comp-non-rolling-out"; + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS)) + .broadcast() + .filter(new OddEvenFilter()); + + + Map<String, String> properties = new HashMap<>(); + Schema keySchema = Schema.create(Type.INT); + Schema valueSchema = Schema.create(Type.STRING); + properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString()); + properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString()); + properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, String.valueOf(true)); + properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC); + RollingSink<Tuple2<Integer, String>> sink = new RollingSink<Tuple2<Integer, String>>(outPath) + .setWriter(new AvroKeyValueSinkWriter<Integer, String>(properties)) + .setBucketer(new NonRollingBucketer()) + .setPartPrefix("part") + .setPendingPrefix("") + .setPendingSuffix(""); + + source.addSink(sink); + + env.execute("RollingSink Avro KeyValue Writer Test"); + + GenericData.setStringType(valueSchema, StringType.String); + Schema elementSchema = AvroKeyValue.getSchema(keySchema, valueSchema); + + FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0")); + SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<GenericRecord>(elementSchema); + DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader); + for (int i = 0; i < NUM_ELEMENTS; i += 2) { + AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next()); + int key = wrappedEntry.getKey().intValue(); + Assert.assertEquals(i, key); + String value = wrappedEntry.getValue(); + Assert.assertEquals("message #" + i, value); + } + + dataFileStream.close(); + inStream.close(); + + inStream = dfs.open(new Path(outPath + "/part-1-0")); + dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader); + + for (int i = 1; i < NUM_ELEMENTS; i += 2) { + AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next()); + int key = wrappedEntry.getKey().intValue(); + Assert.assertEquals(i, key); + String value = wrappedEntry.getValue(); + Assert.assertEquals("message #" + i, value); + } + + dataFileStream.close(); + inStream.close(); + } + // we use this to synchronize the clock changes to elements being processed final static MultiShotLatch latch1 = new MultiShotLatch();
