kad3nce opened a new issue #7001: Which parser to use for Avro encoded messages from Divolte? URL: https://github.com/apache/incubator-druid/issues/7001 I am trying to ingest messages from Kafka which were written by Divolte. Divolte uses Avro encoding for the messages it writes to Kafka. I believe it builds the Avro messages in a non-standard way. This from [Divolte's docs](http://divolte-releases.s3-website-eu-west-1.amazonaws.com/divolte-collector/0.9.0/userdoc/html/getting_started.html#writing-to-kafka): > ## Data in Kafka > Avro files on HDFS are written with the schema in the header. Unfortunately Kafka doesn’t really have a clear way of passing along the schema. For the messages on Kafka queues we expect the consumer to know the schema in advance, meaning that the messages that are passed onto the queue only contain the raw bytes of the serialized Avro record without any metadata. The key of each message is the party ID that for the event. Divolte Collector provides a small helper library to easily create Kafka consumers in Java using Avro’s code generation support. There is an example Kafka consumer with step by step instruction on getting it up and running in our usage examples repository here: https://github.com/divolte/divolte-examples/tree/master/tcp-kafka-consumer. And the messages Divolte is putting on the Kafka queue look like this: ``` key: 0:jro4m5d8:xilY4lXNlasIzvVbEYDFECnbepfZOb_J value: \x00\x00\x00\xB2\xED\xF0\xA2\x97Z\xDA\xCD\xF0\xA2\x97Z\x14172.25.0.1\x00\x02\x90\x02file:///Users/.../frontend/test/index.html\x02\x80\x1E\x02\xEE\n\x02\x80\x1E\x02\xB2\x12\x02V0:jro4m5d8:xilY4lXNlasIzvVbEYDFECnbepfZOb_J\x02V0:jro4m5d8:SPubeXNlImFqTIBWRTI5Kpt5NfWZ7ic1\x02D0:UdmHBajNonE_ONddXwP2679ks0EYxySf\x1AmyCustomEvent\x02\xF0\x01Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36\x02\fChrome\x02\fChrome\x02\x16Google Inc.\x02\x0EBrowser\x02\x1871.0.3578.98\x02\"Personal computer\x02\bOS X\x02\x0E10.14.0\x02(Apple Computer, Inc. ``` ``` key: 0:jro4m5d8:xilY4lXNlasIzvVbEYDFECnbepfZOb_J value: \x00\x00\x00\x9A\x91\xF1\xA2\x97Z\xE6\xF3\xF0\xA2\x97Z\x14172.25.0.1\x00\x02\x90\x02file:///Users/.../frontend/test/index.html\x02\x80\x1E\x02\xEE\n\x02\x80\x1E\x02\xB2\x12\x02V0:jro4m5d8:xilY4lXNlasIzvVbEYDFECnbepfZOb_J\x02V0:jro4m5d8:SPubeXNlImFqTIBWRTI5Kpt5NfWZ7ic1\x02D0:UdmHBajNonE_ONddXwP2679ks0EYxySf\x1AmyCustomEvent\x02\xF0\x01Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36\x02\fChrome\x02\fChrome\x02\x16Google Inc.\x02\x0EBrowser\x02\x1871.0.3578.98\x02\"Personal computer\x02\bOS X\x02\x0E10.14.0\x02(Apple Computer, Inc. ``` Following the great tutorial by @Fokko [here](https://blog.godatadriven.com/divolte-kafka-druid-superset), I opted to use Druid's `schema_inline` Avro decoder in my supervisor spec. However, I am still getting parse errors in my ingestion reports. I'm wondering if something changed in the way the Avro extension works since @Fokko wrote that tutorial? Am I using the wrong Avro decoder? Is there simply no longer a decoder in the Druid Avro extension which can handle messages encoded this way? ## Supervisor ```json { "type" : "kafka", "ioConfig": { "topic": "livestream", "consumerProperties": { "bootstrap.servers": "10.0.1.32:9092" }, "taskCount": 1, "replicas": 1, "taskDuration": "PT5M" }, "dataSchema": { "dataSource": "livestream", "parser": { "type": "avro_stream", "avroBytesDecoder": { "type": "schema_inline", "schema": { "namespace": "io.divolte.record", "type": "record", "name": "DefaultEventRecord", "fields": [ { "name": "detectedDuplicate", "type": "boolean" }, { "name": "detectedCorruption", "type": "boolean" }, { "name": "firstInSession", "type": "boolean" }, { "name": "timestamp", "type": "long" }, { "name": "remoteHost", "type": "string" }, { "name": "referer", "type": ["null", "string"], "default": null }, { "name": "location", "type": ["null", "string"], "default": null }, { "name": "viewportPixelWidth", "type": ["null", "int"], "default": null }, { "name": "viewportPixelHeight", "type": ["null", "int"], "default": null }, { "name": "screenPixelWidth", "type": ["null", "int"], "default": null }, { "name": "screenPixelHeight", "type": ["null", "int"], "default": null }, { "name": "partyId", "type": ["null", "string"], "default": null }, { "name": "sessionId", "type": ["null", "string"], "default": null }, { "name": "pageViewId", "type": ["null", "string"], "default": null }, { "name": "eventType", "type": "string", "default": "unknown" }, { "name": "userAgentString", "type": ["null", "string"], "default": null }, { "name": "userAgentName", "type": ["null", "string"], "default": null }, { "name": "userAgentFamily", "type": ["null", "string"], "default": null }, { "name": "userAgentVendor", "type": ["null", "string"], "default": null }, { "name": "userAgentType", "type": ["null", "string"], "default": null }, { "name": "userAgentVersion", "type": ["null", "string"], "default": null }, { "name": "userAgentDeviceCategory", "type": ["null", "string"], "default": null }, { "name": "userAgentOsFamily", "type": ["null", "string"], "default": null }, { "name": "userAgentOsVersion", "type": ["null", "string"], "default": null }, { "name": "userAgentOsVendor", "type": ["null", "string"], "default": null }, { "name": "technology", "type": ["null", "string"], "default": null } ] } }, "parseSpec": { "format": "timeAndDims", "timestampSpec": { "column": "timestamp", "format": "iso" }, "dimensionsSpec": { "dimensions": [ "detectedDuplicate", "detectedCorruption", "firstInSession", "remoteHost", "referer", "location", "viewportPixelWidth", "viewportPixelHeight", "screenPixelWidth", "screenPixelHeight", "partyId", "sessionId", "pageViewId", "eventType", "userAgentString", "userAgentName", "userAgentFamily", "userAgentVendor", "userAgentType", "userAgentVersion", "userAgentDeviceCategory", "userAgentOsFamily", "userAgentOsVersion", "userAgentOsVendor", "technology" ] } } }, "metricsSpec": [{ "name": "count", "type": "count" }], "granularitySpec": { "type": "uniform", "segmentGranularity": "HOUR", "queryGranularity": "NONE" } }, "tuningConfig": { "type": "kafka", "maxRowsPerSegment": 5000000, "logParseExceptions": true, "maxSavedParseExceptions": 100 } } ``` ## Divolte Conf ### DefaultEventRecord.avsc ```json { "namespace": "io.divolte.record", "name": "DefaultEventRecord", "type": "record", "fields": [ { "name": "detectedDuplicate", "type": "boolean" }, { "name": "detectedCorruption", "type": "boolean" }, { "name": "firstInSession", "type": "boolean" }, { "name": "timestamp", "type": "long" }, { "name": "remoteHost", "type": "string" }, { "name": "referer", "type": ["null", "string"], "default": null }, { "name": "location", "type": ["null", "string"], "default": null }, { "name": "viewportPixelWidth", "type": ["null", "int"], "default": null }, { "name": "viewportPixelHeight", "type": ["null", "int"], "default": null }, { "name": "screenPixelWidth", "type": ["null", "int"], "default": null }, { "name": "screenPixelHeight", "type": ["null", "int"], "default": null }, { "name": "partyId", "type": ["null", "string"], "default": null }, { "name": "sessionId", "type": ["null", "string"], "default": null }, { "name": "pageViewId", "type": ["null", "string"], "default": null }, { "name": "eventType", "type": "string", "default": "unknown" }, { "name": "userAgentString", "type": ["null", "string"], "default": null }, { "name": "userAgentName", "type": ["null", "string"], "default": null }, { "name": "userAgentFamily", "type": ["null", "string"], "default": null }, { "name": "userAgentVendor", "type": ["null", "string"], "default": null }, { "name": "userAgentType", "type": ["null", "string"], "default": null }, { "name": "userAgentVersion", "type": ["null", "string"], "default": null }, { "name": "userAgentDeviceCategory", "type": ["null", "string"], "default": null }, { "name": "userAgentOsFamily", "type": ["null", "string"], "default": null }, { "name": "userAgentOsVersion", "type": ["null", "string"], "default": null }, { "name": "userAgentOsVendor", "type": ["null", "string"], "default": null }, { "name": "technology", "type": ["null", "string"], "default": null } ] } ``` ### mapping.groovy ```groovy mapping { map duplicate() onto 'detectedDuplicate' map corrupt() onto 'detectedCorruption' map firstInSession() onto 'firstInSession' map timestamp() onto 'timestamp' map remoteHost() onto 'remoteHost' map referer() onto 'referer' map location() onto 'location' map viewportPixelWidth() onto 'viewportPixelWidth' map viewportPixelHeight() onto 'viewportPixelHeight' map screenPixelWidth() onto 'screenPixelWidth' map screenPixelHeight() onto 'screenPixelHeight' map partyId() onto 'partyId' map sessionId() onto 'sessionId' map pageViewId() onto 'pageViewId' map eventType() onto 'eventType' map userAgentString() onto 'userAgentString' def ua = userAgent() map ua.name() onto 'userAgentName' map ua.family() onto 'userAgentFamily' map ua.vendor() onto 'userAgentVendor' map ua.type() onto 'userAgentType' map ua.version() onto 'userAgentVersion' map ua.deviceCategory() onto 'userAgentDeviceCategory' map ua.osFamily() onto 'userAgentOsFamily' map ua.osVersion() onto 'userAgentOsVersion' map ua.osVendor() onto 'userAgentOsVendor' map eventParameter('technology') onto 'technology' } ``` ## Ingestion Report ```json { "ingestionStatsAndErrors": { "taskId": "index_kafka_livestream_0db516aaf980d12_bjfgbipd", "payload": { "ingestionState": "COMPLETED", "unparseableEvents": { "buildSegments": [ "Fail to decode avro message!", "Fail to decode avro message!", "Fail to decode avro message!", "Fail to decode avro message!", "Fail to decode avro message!", "Fail to decode avro message!", "Fail to decode avro message!", "Fail to decode avro message!", "Fail to decode avro message!", "Fail to decode avro message!", "Fail to decode avro message!", "Fail to decode avro message!", "Fail to decode avro message!", "Fail to decode avro message!", "Fail to decode avro message!", "Fail to decode avro message!", "Fail to decode avro message!", "Fail to decode avro message!", "Fail to decode avro message!", "Fail to decode avro message!", "Fail to decode avro message!", "Fail to decode avro message!", "Fail to decode avro message!", "Fail to decode avro message!" ] }, "rowStats": { "buildSegments": { "processed": 0, "processedWithError": 0, "thrownAway": 0, "unparseable": 24 } }, "errorMsg": null }, "type": "ingestionStatsAndErrors" } } ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
