Santosh Balasubramanya created FLUME-3047:
---------------------------------------------

             Summary: Avro Sink HDFS with 
org.apache.flume.sink.hdfs.AvroEventSerializer$Builder not working
                 Key: FLUME-3047
                 URL: https://issues.apache.org/jira/browse/FLUME-3047
             Project: Flume
          Issue Type: Bug
          Components: Client SDK, Sinks+Sources
    Affects Versions: v1.7.0
            Reporter: Santosh Balasubramanya
            Priority: Blocker


For the below configuration, avro messages for Kafka topic are pulled and 
written into HDFS succesfully. But when trying to deserialize using 
AvroTools.jar (java -jar avro-tools-1.8.1.jar tojson FlumeData.1484909338012 > 
flume) gives an exception.
Please find below Flume conf and avro related files


 agent1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
 agent1.sources.kafka-source.zookeeperConnect = machinemae:2181
 agent1.sources.kafka-source.topic = unverified
 agent1.sources.kafka-source.groupId = flume
 agent1.sources.kafka-source.channels = memory-channel
 agent1.sources.kafka-source.interceptors = i1
 agent1.sources.kafka-source.interceptors.i1.type = timestamp
 agent1.sources.kafka-source.kafka.consumer.timeout.ms = 100
 #agent1.sources.kafka-source.useFlumeEventFormat = true
 agent1.channels.memory-channel.type = memory
 agent1.channels.memory-channel.capacity = 10000
 agent1.channels.memory-channel.transactionCapacity = 1000

 agent1.sinks.hdfs-sink.type = hdfs
 agent1.sinks.hdfs-sink.hdfs.fileSuffix=.avro
 agent1.sinks.hdfs-sink.hdfs.path = 
/company/jar/source/gu33/s4/1.35/%{topic}/%y-%m-%d
 agent1.sinks.hdfs-sink.hdfs.rollInterval = 5
 agent1.sinks.hdfs-sink.hdfs.rollSize = 0
 agent1.sinks.hdfs-sink.hdfs.rollCount = 0
 agent1.sinks.hdfs-sink.hdfs.fileType = DataStream
 #agent1.sinks.hdfs-sink.hdfs.writeFormat = Text
 agent1.sinks.hdfs-sink.channel = memory-channel
 #agent1.sinks.hdfs-sink.serializer = avro_event
 agent1.sinks.hdfs-sink.serializer.compressionCodec = snappy
 
agent1.sinks.hdfs-sink.serializer=org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
 agent1.sinks.hdfs-sink.serializer.schemaURL = 
hdfs://machinemane:9000/ca/gu33.avsc

 agent1.sources = kafka-source
 agent1.channels = memory-channel
 agent1.sinks = hdfs-sink
############################

with below avro messages and schema
Avro Schema :
{
  "type" : "record",
  "name" : “xmenHeader",
  "namespace" : "com.company.xmen”,
  "fields" : [ {
    "name" : "header",
    "type" : {
      "type" : "record",
      "name" : "header",
      "fields" : [ {
        "name" : "tenant_id",
        "type" : [ "null", "string" ],
        "default" : "null"
      }, {
        "name" : "doc_type_id",
        "type" : [ "null", "string" ],
        "default" : "null"
      }, {
        "name" : "unique_id",
        "type" : [ "null", "string" ],
        "default" : "null"
      }, {
        "name" : "doc_type_version",
        "type" : [ "null", "string" ],
        "default" : "null"
      }, {
        "name" : "product_id",
        "type" : [ "null", "string" ],
        "default" : "null"
      } ]
    }
  }, {
    "name" : "body",
    "type" : {
      "type" : "record",
      "name" : "body",
      "fields" : [ {
        "name" : "name",
        "type" : [ "null", {
          "type" : "record",
          "name" : "name_name_0",
          "fields" : [ {
            "name" : "app_id",
            "type" : [ "null", "string" ],
            "default" : "null"
          } ]
        } ],
        "default" : "null"
      } ]
    }
  } ]
}
 
 
Actual JSON message:
{
"header": {
  "product_id": "GU33",
  "tenant_id": "tenant_name",
  "doc_type_id": "s4",
  "doc_type_version": "1.35"
},
"body":
   {"name" :  {"app_id":"testApp_ID"}}
}
 







--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to