Rumit Patel created FLUME-3305:
----------------------------------

             Summary: Working data pipeline using flume
                 Key: FLUME-3305
                 URL: https://issues.apache.org/jira/browse/FLUME-3305
             Project: Flume
          Issue Type: Question
            Reporter: Rumit Patel


I'm trying to introduce flume to my current workplace and running in to some 
issues. I would like to get help/direction on my approach to the problem so I 
can work through it rather than banging my head on it.

I'm consuming KafkaAvroSerialized messages from KafkaTopic and need to drain 
them on HDFS based on eventTime defined in the message. 

My approach is to have, (Kafka source -> memory channel -> HDFS sink).

 
{code:java}
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.channels = c1
a1.sources.r1.batchSize = 500
a1.sources.r1.batchDurationMillis = 1000
a1.sources.r1.kafka.bootstrap.servers = localhost:9092
a1.sources.r1.kafka.topics = conversationAction
a1.sources.r1.kafka.consumer.group.id = flumeTest

#a1.sources.r1.kafka.consumer.key.deserializer = 
io.confluent.kafka.serializers.KafkaAvroDeserializer   (Using this blows up on 
kafkaSource line 226)
#a1.sources.r1.kafka.consumer.value.deserializer = 
io.confluent.kafka.serializers.KafkaAvroDeserializer
#a1.sources.r1.kafka.consumer.schema.registry.url = http://localhost:8081
#a1.sources.r1.useFlumeEventFormat = true
#a1.sources.r1.kafka.consumer.specific.avro.reader = true

# define data interceptor (so I can determine partition to use for dropping the 
data)
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = 
com.jetblack.avro.intercept.EventInterceptor$Builder
a1.sources.r1.interceptors.i1.hitDateHeader = hitdate

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /tmp/flume/%{topic}/hitdate=%{hitdate}
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.fileSuffix = .avro

# Roll files in HDFS every 5 min or at 255MB; don't roll based on number 
of records
# We roll at 255MB because our block size is 128MB, we want 2 full blocks 
without going over
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 267386880
a1.sinks.k1.hdfs.rollCount = 0
# Write to HDFS file in batches of 20 records
a1.sinks.k1.hdfs.batchSize = 20

a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
a1.sinks.k1.serializer.schemaURL = file:///actions.avsc
# following property gives generic avro event vs. we need custom avro event so 
use above 2 properties
#a1.sinks.k1.serializer = avro_event
a1.sinks.k1.serializer.compressionCodec = snappy

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

{code}
 

 

End users need to be able to access the data using hive queries. So Hive table 
is created using following syntax. 

 
{code:java}
CREATE EXTERNAL TABLE IF NOT EXISTS conversationactiontest
PARTITIONED BY (hitdate INT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION 'file:///tmp/flume/actionsdir'
TBLPROPERTIES 
('avro.schema.url'='file:///src/main/resources/avro/actions.avsc');

{code}
 

It seems to be able to create data files however noticing 2 issues,
 # When I add header for interceptors it results in messy avro files which are 
not preferred.
 # Queries result in following error which is very annoying to me.

 
{code:java}
Failed with exception java.io.IOException:org.apache.avro.AvroRuntimeException: 
Malformed data. Length is negative: -1
{code}
 

I've tried few options but no luck yet getting to the root cause. Any help 
would be highly appreciated.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to