kad3nce opened a new issue #7001: Which parser to use for Avro encoded messages 
from Divolte?
URL: https://github.com/apache/incubator-druid/issues/7001
 
 
   I am trying to ingest messages from Kafka which were written by Divolte.
   
   Divolte uses Avro encoding for the messages it writes to Kafka. I believe it 
builds the Avro messages in a non-standard way. This from [Divolte's 
docs](http://divolte-releases.s3-website-eu-west-1.amazonaws.com/divolte-collector/0.9.0/userdoc/html/getting_started.html#writing-to-kafka):
   
   > ## Data in Kafka
   > Avro files on HDFS are written with the schema in the header. 
Unfortunately Kafka doesn’t really have a clear way of passing along the 
schema. For the messages on Kafka queues we expect the consumer to know the 
schema in advance, meaning that the messages that are passed onto the queue 
only contain the raw bytes of the serialized Avro record without any metadata. 
The key of each message is the party ID that for the event. Divolte Collector 
provides a small helper library to easily create Kafka consumers in Java using 
Avro’s code generation support. There is an example Kafka consumer with step by 
step instruction on getting it up and running in our usage examples repository 
here: 
https://github.com/divolte/divolte-examples/tree/master/tcp-kafka-consumer.
   
   And the messages Divolte is putting on the Kafka queue look like this:
   ```
   key: 0:jro4m5d8:xilY4lXNlasIzvVbEYDFECnbepfZOb_J
   value: 
\x00\x00\x00\xB2\xED\xF0\xA2\x97Z\xDA\xCD\xF0\xA2\x97Z\x14172.25.0.1\x00\x02\x90\x02file:///Users/.../frontend/test/index.html\x02\x80\x1E\x02\xEE\n\x02\x80\x1E\x02\xB2\x12\x02V0:jro4m5d8:xilY4lXNlasIzvVbEYDFECnbepfZOb_J\x02V0:jro4m5d8:SPubeXNlImFqTIBWRTI5Kpt5NfWZ7ic1\x02D0:UdmHBajNonE_ONddXwP2679ks0EYxySf\x1AmyCustomEvent\x02\xF0\x01Mozilla/5.0
 (Macintosh; Intel Mac OS X 10_14_0) AppleWebKit/537.36 (KHTML, like Gecko) 
Chrome/71.0.3578.98 Safari/537.36\x02\fChrome\x02\fChrome\x02\x16Google 
Inc.\x02\x0EBrowser\x02\x1871.0.3578.98\x02\"Personal computer\x02\bOS 
X\x02\x0E10.14.0\x02(Apple Computer, Inc.
   ```
   ```
   key: 0:jro4m5d8:xilY4lXNlasIzvVbEYDFECnbepfZOb_J
   value: 
\x00\x00\x00\x9A\x91\xF1\xA2\x97Z\xE6\xF3\xF0\xA2\x97Z\x14172.25.0.1\x00\x02\x90\x02file:///Users/.../frontend/test/index.html\x02\x80\x1E\x02\xEE\n\x02\x80\x1E\x02\xB2\x12\x02V0:jro4m5d8:xilY4lXNlasIzvVbEYDFECnbepfZOb_J\x02V0:jro4m5d8:SPubeXNlImFqTIBWRTI5Kpt5NfWZ7ic1\x02D0:UdmHBajNonE_ONddXwP2679ks0EYxySf\x1AmyCustomEvent\x02\xF0\x01Mozilla/5.0
 (Macintosh; Intel Mac OS X 10_14_0) AppleWebKit/537.36 (KHTML, like Gecko) 
Chrome/71.0.3578.98 Safari/537.36\x02\fChrome\x02\fChrome\x02\x16Google 
Inc.\x02\x0EBrowser\x02\x1871.0.3578.98\x02\"Personal computer\x02\bOS 
X\x02\x0E10.14.0\x02(Apple Computer, Inc.
   ```
   
   
   
   Following the great tutorial by @Fokko 
[here](https://blog.godatadriven.com/divolte-kafka-druid-superset), I opted to 
use Druid's `schema_inline` Avro decoder in my supervisor spec.
   
   However, I am still getting parse errors in my ingestion reports.
   
   I'm wondering if something changed in the way the Avro extension works since 
@Fokko wrote that tutorial?
   
   Am I using the wrong Avro decoder?
   
   Is there simply no longer a decoder in the Druid Avro extension which can 
handle messages encoded this way?
   
   ## Supervisor
   
   ```json
   {
     "type" : "kafka",
        "ioConfig": {
                "topic": "livestream",
                "consumerProperties": {
                        "bootstrap.servers": "10.0.1.32:9092"
                },
                "taskCount": 1,
                "replicas": 1,
                "taskDuration": "PT5M"
        },
        "dataSchema": {
                "dataSource": "livestream",
                "parser": {
                        "type": "avro_stream",
                        "avroBytesDecoder": {
                                "type": "schema_inline",
                                "schema": {
                                        "namespace": "io.divolte.record",
                                        "type": "record",
                                        "name": "DefaultEventRecord",
                                        "fields": [
                                                { "name": "detectedDuplicate",  
     "type": "boolean" },
                                                { "name": "detectedCorruption", 
     "type": "boolean" },
                                                { "name": "firstInSession",     
     "type": "boolean" },
                                                { "name": "timestamp",          
     "type": "long" },
                                                { "name": "remoteHost",         
     "type": "string" },
                                                { "name": "referer",            
     "type": ["null", "string"], "default": null },
                                                { "name": "location",           
     "type": ["null", "string"], "default": null },
                                                { "name": "viewportPixelWidth", 
     "type": ["null", "int"],    "default": null },
                                                { "name": 
"viewportPixelHeight",     "type": ["null", "int"],    "default": null },
                                                { "name": "screenPixelWidth",   
     "type": ["null", "int"],    "default": null },
                                                { "name": "screenPixelHeight",  
     "type": ["null", "int"],    "default": null },
                                                { "name": "partyId",            
     "type": ["null", "string"], "default": null },
                                                { "name": "sessionId",          
     "type": ["null", "string"], "default": null },
                                                { "name": "pageViewId",         
     "type": ["null", "string"], "default": null },
                                                { "name": "eventType",          
     "type": "string",           "default": "unknown" },
                                                { "name": "userAgentString",    
     "type": ["null", "string"], "default": null },
                                                { "name": "userAgentName",      
     "type": ["null", "string"], "default": null },
                                                { "name": "userAgentFamily",    
     "type": ["null", "string"], "default": null },
                                                { "name": "userAgentVendor",    
     "type": ["null", "string"], "default": null },
                                                { "name": "userAgentType",      
     "type": ["null", "string"], "default": null },
                                                { "name": "userAgentVersion",   
     "type": ["null", "string"], "default": null },
                                                { "name": 
"userAgentDeviceCategory", "type": ["null", "string"], "default": null },
                                                { "name": "userAgentOsFamily",  
     "type": ["null", "string"], "default": null },
                                                { "name": "userAgentOsVersion", 
     "type": ["null", "string"], "default": null },
                                                { "name": "userAgentOsVendor",  
     "type": ["null", "string"], "default": null },
                                                { "name": "technology",         
     "type": ["null", "string"], "default": null }
                                        ]
   
                                }
                        },
                        "parseSpec": {
                                "format": "timeAndDims",
                                "timestampSpec": {
                                        "column": "timestamp",
                                        "format": "iso"
                                },
                                "dimensionsSpec": {
                                        "dimensions": [
                                                "detectedDuplicate",
                                                "detectedCorruption",
                                                "firstInSession",
                                                "remoteHost",
                                                "referer",
                                                "location",
                                                "viewportPixelWidth",
                                                "viewportPixelHeight",
                                                "screenPixelWidth",
                                                "screenPixelHeight",
                                                "partyId",
                                                "sessionId",
                                                "pageViewId",
                                                "eventType",
                                                "userAgentString",
                                                "userAgentName",
                                                "userAgentFamily",
                                                "userAgentVendor",
                                                "userAgentType",
                                                "userAgentVersion",
                                                "userAgentDeviceCategory",
                                                "userAgentOsFamily",
                                                "userAgentOsVersion",
                                                "userAgentOsVendor",
                                                "technology"
                                        ]
                                }
                        }
                },
                "metricsSpec": [{
                        "name": "count",
                        "type": "count"
                        }],
                        "granularitySpec": {
                                "type": "uniform",
                                "segmentGranularity": "HOUR",
                                "queryGranularity": "NONE"
                        }
                },
                "tuningConfig": {
                        "type": "kafka",
                        "maxRowsPerSegment": 5000000,
                        "logParseExceptions": true,
                        "maxSavedParseExceptions": 100
                }
        }
   ```
   
   ## Divolte Conf
   
   ### DefaultEventRecord.avsc
   
   ```json
   {
       "namespace": "io.divolte.record",
       "name": "DefaultEventRecord",
       "type": "record",
       "fields": [
           { "name": "detectedDuplicate",       "type": "boolean" },
           { "name": "detectedCorruption",      "type": "boolean" },
           { "name": "firstInSession",          "type": "boolean" },
           { "name": "timestamp",               "type": "long" },
           { "name": "remoteHost",              "type": "string" },
           { "name": "referer",                 "type": ["null", "string"], 
"default": null },
           { "name": "location",                "type": ["null", "string"], 
"default": null },
           { "name": "viewportPixelWidth",      "type": ["null", "int"],    
"default": null },
           { "name": "viewportPixelHeight",     "type": ["null", "int"],    
"default": null },
           { "name": "screenPixelWidth",        "type": ["null", "int"],    
"default": null },
           { "name": "screenPixelHeight",       "type": ["null", "int"],    
"default": null },
           { "name": "partyId",                 "type": ["null", "string"], 
"default": null },
           { "name": "sessionId",               "type": ["null", "string"], 
"default": null },
           { "name": "pageViewId",              "type": ["null", "string"], 
"default": null },
           { "name": "eventType",               "type": "string",           
"default": "unknown" },
           { "name": "userAgentString",         "type": ["null", "string"], 
"default": null },
           { "name": "userAgentName",           "type": ["null", "string"], 
"default": null },
           { "name": "userAgentFamily",         "type": ["null", "string"], 
"default": null },
           { "name": "userAgentVendor",         "type": ["null", "string"], 
"default": null },
           { "name": "userAgentType",           "type": ["null", "string"], 
"default": null },
           { "name": "userAgentVersion",        "type": ["null", "string"], 
"default": null },
           { "name": "userAgentDeviceCategory", "type": ["null", "string"], 
"default": null },
           { "name": "userAgentOsFamily",       "type": ["null", "string"], 
"default": null },
           { "name": "userAgentOsVersion",      "type": ["null", "string"], 
"default": null },
           { "name": "userAgentOsVendor",       "type": ["null", "string"], 
"default": null },
           { "name": "technology",              "type": ["null", "string"], 
"default": null }
       ]
   }
   ```
   
   ### mapping.groovy
   
   ```groovy
   mapping {
       map duplicate() onto 'detectedDuplicate'
       map corrupt() onto 'detectedCorruption'
       map firstInSession() onto 'firstInSession'
       map timestamp() onto 'timestamp'
       map remoteHost() onto 'remoteHost'
       map referer() onto 'referer'
       map location() onto 'location'
       map viewportPixelWidth() onto 'viewportPixelWidth'
       map viewportPixelHeight() onto 'viewportPixelHeight'
       map screenPixelWidth() onto 'screenPixelWidth'
       map screenPixelHeight() onto 'screenPixelHeight'
       map partyId() onto 'partyId'
       map sessionId() onto 'sessionId'
       map pageViewId() onto 'pageViewId'
       map eventType() onto 'eventType'
   
       map userAgentString() onto 'userAgentString'
       def ua = userAgent()
       map ua.name() onto 'userAgentName'
       map ua.family() onto 'userAgentFamily'
       map ua.vendor() onto 'userAgentVendor'
       map ua.type() onto 'userAgentType'
       map ua.version() onto 'userAgentVersion'
       map ua.deviceCategory() onto 'userAgentDeviceCategory'
       map ua.osFamily() onto 'userAgentOsFamily'
       map ua.osVersion() onto 'userAgentOsVersion'
       map ua.osVendor() onto 'userAgentOsVendor'
   
       map eventParameter('technology') onto 'technology'
   }
   ```
   
   ## Ingestion Report
   
   ```json
   {
     "ingestionStatsAndErrors": {
       "taskId": "index_kafka_livestream_0db516aaf980d12_bjfgbipd",
       "payload": {
         "ingestionState": "COMPLETED",
         "unparseableEvents": {
           "buildSegments": [
             "Fail to decode avro message!",
             "Fail to decode avro message!",
             "Fail to decode avro message!",
             "Fail to decode avro message!",
             "Fail to decode avro message!",
             "Fail to decode avro message!",
             "Fail to decode avro message!",
             "Fail to decode avro message!",
             "Fail to decode avro message!",
             "Fail to decode avro message!",
             "Fail to decode avro message!",
             "Fail to decode avro message!",
             "Fail to decode avro message!",
             "Fail to decode avro message!",
             "Fail to decode avro message!",
             "Fail to decode avro message!",
             "Fail to decode avro message!",
             "Fail to decode avro message!",
             "Fail to decode avro message!",
             "Fail to decode avro message!",
             "Fail to decode avro message!",
             "Fail to decode avro message!",
             "Fail to decode avro message!",
             "Fail to decode avro message!"
           ]
         },
         "rowStats": {
           "buildSegments": {
             "processed": 0,
             "processedWithError": 0,
             "thrownAway": 0,
             "unparseable": 24
           }
         },
         "errorMsg": null
       },
       "type": "ingestionStatsAndErrors"
     }
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to