That does it. Looks like its gone undocumented. -roshan ________________________________________ From: Hari Shreedharan <[email protected]> Sent: Wednesday, January 28, 2015 10:29 PM To: [email protected] Cc: [email protected] Subject: Re: Puzzled with Avro serializer + HDFS Sink
Take a look at the AvroEventSerializer: https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java You’d need to specify the schema in a file and have that added to the event headers. Thanks, Hari On Wed, Jan 28, 2015 at 9:45 PM, Roshan Naik <[email protected]> wrote: > If Flume is fed an avro file via the SpoolDir src and then the data is > drained into HDFS using the avro serializer, i am noticing that what ends > up in the hdfs is an avro file with the below schema: > { > "type" : "record", > "name" : "Event", > "fields" : [ { > "name" : "headers", > "type" : { > "type" : "map", > "values" : "string" > } > }, { > "name" : "body", > "type" : "bytes" > } ] > } > Basically each record is not the orignal avro record but instead it the > flume event object (i.e headers+body) and the body part of it has the > original avro record embedded. > if the event header has a schema then the schema ends up in each record of > the output file. > AFAIKT, this is undesirable. What is desired is to kind of recreate the > original file contents. > *My question* is how to configure flume so that the original records > instead of wrapping it in FlumeEvent objects in the destination file. > *details:* > Attached a debugger to the running agent and felt that the spooldir src is > deserializing the avro file correct into a flume event (header with schema, > body with avro datum).. > But on the hdfs sink side the avro serializer is writing out the *whole* > FlumeEvent object... instead of serializing just the event body. > Below is the config i used: > agent.channels = memoryChannel > agent.channels.memoryChannel.type = memory > agent.sources = sd > *agent.sources.sd.type = spooldir* > agent.sources.sd.spoolDir = /tmp/avro > *agent.sources.sd.deserializer = AVRO* > *agent.sources.sd.deserializer.schemaType = LITERAL* > agent.sources.sd.channels = memoryChannel > agent.sinks = hdfsSink > agent.sinks.hdfsSink.type = hdfs > agent.sinks.hdfsSink.channel = memoryChannel > agent.sinks.hdfsSink.hdfs.path = /tmp/flumetest/avro > agent.sinks.hdfsSink.hdfs.fileType = DataStream > agent.sinks.hdfsSink.serializer = avro_event > agent.sinks.hdfsSink.hdfs.callTimeout = 5000 > -roshan > -- > CONFIDENTIALITY NOTICE > NOTICE: This message is intended for the use of the individual or entity to > which it is addressed and may contain information that is confidential, > privileged and exempt from disclosure under applicable law. If the reader > of this message is not the intended recipient, you are hereby notified that > any printing, copying, dissemination, distribution, disclosure or > forwarding of this communication is strictly prohibited. If you have > received this communication in error, please contact the sender immediately > and delete it from your system. Thank You.
