Rumit Patel created FLUME-3305:
----------------------------------
Summary: Working data pipeline using flume
Key: FLUME-3305
URL: https://issues.apache.org/jira/browse/FLUME-3305
Project: Flume
Issue Type: Question
Reporter: Rumit Patel
I'm trying to introduce flume to my current workplace and running in to some
issues. I would like to get help/direction on my approach to the problem so I
can work through it rather than banging my head on it.
I'm consuming KafkaAvroSerialized messages from KafkaTopic and need to drain
them on HDFS based on eventTime defined in the message.
My approach is to have, (Kafka source -> memory channel -> HDFS sink).
{code:java}
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.channels = c1
a1.sources.r1.batchSize = 500
a1.sources.r1.batchDurationMillis = 1000
a1.sources.r1.kafka.bootstrap.servers = localhost:9092
a1.sources.r1.kafka.topics = conversationAction
a1.sources.r1.kafka.consumer.group.id = flumeTest
#a1.sources.r1.kafka.consumer.key.deserializer =
io.confluent.kafka.serializers.KafkaAvroDeserializer (Using this blows up on
kafkaSource line 226)
#a1.sources.r1.kafka.consumer.value.deserializer =
io.confluent.kafka.serializers.KafkaAvroDeserializer
#a1.sources.r1.kafka.consumer.schema.registry.url = http://localhost:8081
#a1.sources.r1.useFlumeEventFormat = true
#a1.sources.r1.kafka.consumer.specific.avro.reader = true
# define data interceptor (so I can determine partition to use for dropping the
data)
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =
com.jetblack.avro.intercept.EventInterceptor$Builder
a1.sources.r1.interceptors.i1.hitDateHeader = hitdate
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /tmp/flume/%{topic}/hitdate=%{hitdate}
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.fileSuffix = .avro
# Roll files in HDFS every 5 min or at 255MB; don't roll based on number
of records
# We roll at 255MB because our block size is 128MB, we want 2 full blocks
without going over
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 267386880
a1.sinks.k1.hdfs.rollCount = 0
# Write to HDFS file in batches of 20 records
a1.sinks.k1.hdfs.batchSize = 20
a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
a1.sinks.k1.serializer.schemaURL = file:///actions.avsc
# following property gives generic avro event vs. we need custom avro event so
use above 2 properties
#a1.sinks.k1.serializer = avro_event
a1.sinks.k1.serializer.compressionCodec = snappy
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
{code}
End users need to be able to access the data using hive queries. So Hive table
is created using following syntax.
{code:java}
CREATE EXTERNAL TABLE IF NOT EXISTS conversationactiontest
PARTITIONED BY (hitdate INT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION 'file:///tmp/flume/actionsdir'
TBLPROPERTIES
('avro.schema.url'='file:///src/main/resources/avro/actions.avsc');
{code}
It seems to be able to create data files however noticing 2 issues,
# When I add header for interceptors it results in messy avro files which are
not preferred.
# Queries result in following error which is very annoying to me.
{code:java}
Failed with exception java.io.IOException:org.apache.avro.AvroRuntimeException:
Malformed data. Length is negative: -1
{code}
I've tried few options but no luck yet getting to the root cause. Any help
would be highly appreciated.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]