[ 
https://issues.apache.org/jira/browse/FLINK-3854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15266685#comment-15266685
 ] 

ASF GitHub Bot commented on FLINK-3854:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1953#discussion_r61745619
  
    --- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
 ---
    @@ -0,0 +1,308 @@
    +package org.apache.flink.streaming.connectors.fs;
    +
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.Arrays;
    +import java.util.Map;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.file.CodecFactory;
    +import org.apache.avro.file.DataFileConstants;
    +import org.apache.avro.file.DataFileWriter;
    +import org.apache.avro.generic.GenericData;
    +import org.apache.avro.generic.GenericDatumWriter;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.io.DatumWriter;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    +
    +/**
    +* Implementation of AvroKeyValue writer that can be used in Sink.
    +* Each entry would be wrapped in GenericRecord with key/value fields(same 
as in m/r lib)
    +<pre>
    +Usage:
    +{@code
    +           RollingSink<Tuple2<Long , Long>> sink = new 
RollingSink<Tuple2<Long , Long>>("/tmp/path");
    +           sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd/HH/mm/"));
    +           sink.setPendingSuffix(".avro");
    +           Map<String,String> properties = new HashMap<>();
    +           Schema longSchema = Schema.create(Type.LONG);
    +           String keySchema = longSchema.toString();
    +           String valueSchema = longSchema.toString();
    +           properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, 
keySchema);
    +           properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, 
valueSchema);
    +           properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, 
Boolean.toString(true));
    +           properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, 
DataFileConstants.SNAPPY_CODEC);
    +           
    +           sink.setWriter(new AvroSinkWriter<Long , Long>(properties));
    +           sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB,
    +}
    +</pre>
    +*/
    +public class AvroKeyValueSinkWriter<K, V> extends 
StreamWriterBase<Tuple2<K, V>>  implements Writer<Tuple2<K, V>>, 
InputTypeConfigurable {
    +   private static final long serialVersionUID = 1L;
    +   public static final String CONF_OUTPUT_KEY_SCHEMA = 
"avro.schema.output.key";
    +   public static final String CONF_OUTPUT_VALUE_SCHEMA = 
"avro.schema.output.value";
    +   public static final String CONF_COMPRESS = FileOutputFormat.COMPRESS;
    +   public static final String CONF_COMPRESS_CODEC = 
FileOutputFormat.COMPRESS_CODEC;
    +   public static final String CONF_DEFLATE_LEVEL = "avro.deflate.level";
    +   public static final String CONF_XZ_LEVEL = "avro.xz.level";
    +
    +   private transient AvroKeyValueWriter<K, V> keyValueWriter;
    +
    +   private final Map<String, String> properties;
    +
    +   /**
    +    * C'tor for the writer
    +    * <p>
    +    * You can provide different properties that will be used to configure 
avro key-value writer as simple properties map(see example above)
    +    * @param properties
    +    */
    +   public AvroKeyValueSinkWriter(Map<String, String> properties) {
    +           this.properties = properties;
    +   }
    +
    +   private boolean getBoolean(Map<String,String> conf, String key, boolean 
def) {
    +           String value = conf.get(key);
    +           if (value == null) {
    +                   return def;
    +           }
    +           return Boolean.parseBoolean(value);
    +   }
    +   
    +   private int getInt(Map<String,String> conf, String key, int def) {
    +           String value = conf.get(key);
    +           if (value == null) {
    +                   return def;
    +           }
    +           return Integer.parseInt(value);
    +   }
    +
    +   //this derived from AvroOutputFormatBase.getCompressionCodec(..)
    +   private CodecFactory getCompressionCodec(Map<String,String> conf) {
    +           if (getBoolean(conf, CONF_COMPRESS, false)) {
    +                   int deflateLevel = getInt(conf, CONF_DEFLATE_LEVEL, 
CodecFactory.DEFAULT_DEFLATE_LEVEL);
    +                   int xzLevel = getInt(conf, CONF_XZ_LEVEL, 
CodecFactory.DEFAULT_XZ_LEVEL);
    +
    +                   String outputCodec = conf.get(CONF_COMPRESS_CODEC);
    +
    +                   if 
(DataFileConstants.DEFLATE_CODEC.equals(outputCodec)) {
    +                           return CodecFactory.deflateCodec(deflateLevel);
    +                   } else if 
(DataFileConstants.XZ_CODEC.equals(outputCodec)) {
    +                           return CodecFactory.xzCodec(xzLevel);
    +                   } else {
    +                           return CodecFactory.fromString(outputCodec);
    +                   }
    +           }
    +           return CodecFactory.nullCodec();
    +   }
    +
    +   @Override
    +   public void open(FileSystem fs, Path path) throws IOException {
    +           super.open(fs, path);
    +
    +           CodecFactory compressionCodec = getCompressionCodec(properties);
    +                             
    +           String keySchemaString = properties.get(CONF_OUTPUT_KEY_SCHEMA);
    +           if (keySchemaString == null) {
    +                   throw new IllegalStateException("No key schema 
provided, set '" + CONF_OUTPUT_KEY_SCHEMA + "' property");
    +           }
    +           @SuppressWarnings("deprecation")
    +           Schema keySchema = Schema.parse(keySchemaString);
    +           
    +           String valueSchemaString = 
properties.get(CONF_OUTPUT_VALUE_SCHEMA);
    +           if (valueSchemaString == null) {
    +                   throw new IllegalStateException("No value schema 
provided, set '" + CONF_OUTPUT_VALUE_SCHEMA + "' property");
    --- End diff --
    
    Same here.


> Support Avro key-value rolling sink writer
> ------------------------------------------
>
>                 Key: FLINK-3854
>                 URL: https://issues.apache.org/jira/browse/FLINK-3854
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming Connectors
>    Affects Versions: 1.0.3
>            Reporter: Igor Berman
>
> Support rolling sink writer in avro key value format.
> preferably without additional classpath dependencies
> preferable in same format as M/R jobs for backward compatibility



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to