[ https://issues.apache.org/jira/browse/FLUME-3138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ramgopal N updated FLUME-3138: ------------------------------ Issue Type: Bug (was: Question) > SchemaURL from flume configuration is dropping the flume events expecting the > schema url to be added in event header as against FLUME-2810 > ------------------------------------------------------------------------------------------------------------------------------------------ > > Key: FLUME-3138 > URL: https://issues.apache.org/jira/browse/FLUME-3138 > Project: Flume > Issue Type: Bug > Components: Configuration > Affects Versions: 1.7.0 > Environment: Flume1.7 > Reporter: Ramgopal N > > I have avro data coming to kafka topic. Flume reads the events from kafka and > then using kite dataset with hdfs sink is put into HDFS as parquet data. > Flume config is as below: > agent.sinks.k1.channel = c1 > agent.sinks.k1.type = org.apache.flume.sink.kite.DatasetSink > agent.sinks.k1.kite.dataset.uri = > dataset:hdfs://namenodeHA/kite/avro_to_parquet_item2 > agent.sinks.k1.serializer = > org.apache.flume.sink.hdfs.AvroEventSerializer$Builder > agent.sinks.k1.hdfs.filePrefix=parquetdata > agent.sinks.k1.hdfs.fileSuffix = .parquet > agent.sinks.k1.hdfs.fileType=DataStream > #agent.sinks.k1.hdfs.rollInterval=30 > #agent.sinks.k1.hdfs.rollCount=1 > #agent.sinks.k1.hdfs.batchSize=1 > agent.sinks.k1.kite.batchSize=2 > agent.sinks.k1.kite.rollInterval=30 > agent.sinks.k1.kite.flushable.commitOnBatch=true > #agent.sinks.k1.hdfs.path = hdfs://namenodeHA/user/flumetest > #agent.sinks.k1.serializer.compressionCodec = snappy > agent.sinks.k1.serializer.schemaURL = hdfs://namenodeHA/kite/item.avsc > I am getting the below exception in the flume logs: > 2017-07-31 06:18:40,796 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO > - org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:153)] > Got brand-new compressor [.snappy] > 2017-07-31 06:18:40,802 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO > - > org.kitesdk.data.spi.filesystem.FileSystemWriter.initialize(FileSystemWriter.java:147)] > Opened output appender > ParquetAppender{path=hdfs://namenodeHA/kite/avro_to_parquet_item2/.6d1019b3-96c4-4334-b737-af260d17aac4.parquet.tmp, > > schema={"type":"record","name":"item","namespace":"item.avro","fields":[{"name":"i_item_sk","type":..................................{"name":"i_manager_id","type":["null","int"]},{"name":"i_product_name","type":["null","string"]}]}, > fileSystem=DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_2077692400_17, > ugi=root (auth:SIMPLE)]], > avroParquetWriter=parquet.avro.AvroParquetWriter@31ffba30} for > hdfs://namenodeHA/kite/avro_to_parquet_item2/6d1019b3-96c4-4334-b737-af260d17aac4.parquet > 2017-07-31 06:18:40,803 (SinkRunner-PollingRunner-DefaultSinkProcessor) > [ERROR - > org.apache.flume.sink.kite.policy.RetryPolicy.handle(RetryPolicy.java:39)] > Event delivery failed: No schema in event headers. Headers must include > either flume.avro.schema.url or flume.avro.schema.literal > 2017-07-31 06:18:40,803 (SinkRunner-PollingRunner-DefaultSinkProcessor) > [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] > Unable to deliver event. Exception follows. > org.apache.flume.EventDeliveryException: > org.apache.flume.sink.kite.NonRecoverableEventException: No schema in event > headers. Headers must include either flume.avro.schema.url or > flume.avro.schema.literal > at > org.apache.flume.sink.kite.policy.RetryPolicy.handle(RetryPolicy.java:42) > at org.apache.flume.sink.kite.DatasetSink.write(DatasetSink.java:375) > at org.apache.flume.sink.kite.DatasetSink.process(DatasetSink.java:301) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flume.sink.kite.NonRecoverableEventException: No schema > in event headers. Headers must include either flume.avro.schema.url or > flume.avro.schema.literal > at > org.apache.flume.sink.kite.parser.AvroParser.schema(AvroParser.java:185) > at > org.apache.flume.sink.kite.parser.AvroParser.parse(AvroParser.java:155) > at > org.apache.flume.sink.kite.parser.AvroParser.parse(AvroParser.java:56) > at org.apache.flume.sink.kite.DatasetSink.write(DatasetSink.java:366) -- This message was sent by Atlassian JIRA (v6.4.14#64029)