[
https://issues.apache.org/jira/browse/FLINK-3854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15266685#comment-15266685
]
ASF GitHub Bot commented on FLINK-3854:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/1953#discussion_r61745619
--- Diff:
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
---
@@ -0,0 +1,308 @@
+package org.apache.flink.streaming.connectors.fs;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+* Implementation of AvroKeyValue writer that can be used in Sink.
+* Each entry would be wrapped in GenericRecord with key/value fields(same
as in m/r lib)
+<pre>
+Usage:
+{@code
+ RollingSink<Tuple2<Long , Long>> sink = new
RollingSink<Tuple2<Long , Long>>("/tmp/path");
+ sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd/HH/mm/"));
+ sink.setPendingSuffix(".avro");
+ Map<String,String> properties = new HashMap<>();
+ Schema longSchema = Schema.create(Type.LONG);
+ String keySchema = longSchema.toString();
+ String valueSchema = longSchema.toString();
+ properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA,
keySchema);
+ properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA,
valueSchema);
+ properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS,
Boolean.toString(true));
+ properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC,
DataFileConstants.SNAPPY_CODEC);
+
+ sink.setWriter(new AvroSinkWriter<Long , Long>(properties));
+ sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB,
+}
+</pre>
+*/
+public class AvroKeyValueSinkWriter<K, V> extends
StreamWriterBase<Tuple2<K, V>> implements Writer<Tuple2<K, V>>,
InputTypeConfigurable {
+ private static final long serialVersionUID = 1L;
+ public static final String CONF_OUTPUT_KEY_SCHEMA =
"avro.schema.output.key";
+ public static final String CONF_OUTPUT_VALUE_SCHEMA =
"avro.schema.output.value";
+ public static final String CONF_COMPRESS = FileOutputFormat.COMPRESS;
+ public static final String CONF_COMPRESS_CODEC =
FileOutputFormat.COMPRESS_CODEC;
+ public static final String CONF_DEFLATE_LEVEL = "avro.deflate.level";
+ public static final String CONF_XZ_LEVEL = "avro.xz.level";
+
+ private transient AvroKeyValueWriter<K, V> keyValueWriter;
+
+ private final Map<String, String> properties;
+
+ /**
+ * C'tor for the writer
+ * <p>
+ * You can provide different properties that will be used to configure
avro key-value writer as simple properties map(see example above)
+ * @param properties
+ */
+ public AvroKeyValueSinkWriter(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
+ private boolean getBoolean(Map<String,String> conf, String key, boolean
def) {
+ String value = conf.get(key);
+ if (value == null) {
+ return def;
+ }
+ return Boolean.parseBoolean(value);
+ }
+
+ private int getInt(Map<String,String> conf, String key, int def) {
+ String value = conf.get(key);
+ if (value == null) {
+ return def;
+ }
+ return Integer.parseInt(value);
+ }
+
+ //this derived from AvroOutputFormatBase.getCompressionCodec(..)
+ private CodecFactory getCompressionCodec(Map<String,String> conf) {
+ if (getBoolean(conf, CONF_COMPRESS, false)) {
+ int deflateLevel = getInt(conf, CONF_DEFLATE_LEVEL,
CodecFactory.DEFAULT_DEFLATE_LEVEL);
+ int xzLevel = getInt(conf, CONF_XZ_LEVEL,
CodecFactory.DEFAULT_XZ_LEVEL);
+
+ String outputCodec = conf.get(CONF_COMPRESS_CODEC);
+
+ if
(DataFileConstants.DEFLATE_CODEC.equals(outputCodec)) {
+ return CodecFactory.deflateCodec(deflateLevel);
+ } else if
(DataFileConstants.XZ_CODEC.equals(outputCodec)) {
+ return CodecFactory.xzCodec(xzLevel);
+ } else {
+ return CodecFactory.fromString(outputCodec);
+ }
+ }
+ return CodecFactory.nullCodec();
+ }
+
+ @Override
+ public void open(FileSystem fs, Path path) throws IOException {
+ super.open(fs, path);
+
+ CodecFactory compressionCodec = getCompressionCodec(properties);
+
+ String keySchemaString = properties.get(CONF_OUTPUT_KEY_SCHEMA);
+ if (keySchemaString == null) {
+ throw new IllegalStateException("No key schema
provided, set '" + CONF_OUTPUT_KEY_SCHEMA + "' property");
+ }
+ @SuppressWarnings("deprecation")
+ Schema keySchema = Schema.parse(keySchemaString);
+
+ String valueSchemaString =
properties.get(CONF_OUTPUT_VALUE_SCHEMA);
+ if (valueSchemaString == null) {
+ throw new IllegalStateException("No value schema
provided, set '" + CONF_OUTPUT_VALUE_SCHEMA + "' property");
--- End diff --
Same here.
> Support Avro key-value rolling sink writer
> ------------------------------------------
>
> Key: FLINK-3854
> URL: https://issues.apache.org/jira/browse/FLINK-3854
> Project: Flink
> Issue Type: Improvement
> Components: Streaming Connectors
> Affects Versions: 1.0.3
> Reporter: Igor Berman
>
> Support rolling sink writer in avro key value format.
> preferably without additional classpath dependencies
> preferable in same format as M/R jobs for backward compatibility
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)