Lokesh, it looks like you got dropped from the thread, so I'm adding you
back. Please check out the previous message for some comments.

By the way, by default, replies to the dev list go back to the dev list
only, which can cause you to miss some replies. If you join the list you
will be sure to get all your replies 🙂

On Tue, Sep 14, 2021 at 10:10 PM Gian Merlino <g...@apache.org> wrote:

> Hey Lokesh,
>
> The concept and API looks solid to me! Thank you for writing this up. I
> agree with Ben's comment. This will be really useful functionality.
>
> I have a few questions about how it would work:
>
> 1) How is the timestamp exposed exactly? I see there is a
> recordTimestampLabelPrefix, but what is that a prefix to? Also: what do you
> think about accepting the entire name of the timestamp field instead?
> Finally: in the docs it would be good to have an example of how people can
> write a timestampSpec that refers to the Kafka timestamp, and also how they
> can load the Kafka timestamp as a long-typed dimension storing millis since
> the epoch (our convention for secondary timestamps).
>
> 2) You mention that the key will show up as "kafka.key", and in the
> example you provide I don't see a parameter enabling a choice of what that
> field is called. Is it hard-coded or is it configurable somehow?
>
> 3) Could you write up some user-facing docs too, like an addition to
> development/extensions-core/kafka-ingestion.md? That way, people will know
> how to use this feature. And it'll help us better understand how it's
> supposed to work. (Perhaps it could have answered the two questions above)
>
> Full disclosure: I haven't reviewed the patch yet; these questions are
> just based on your writeup.
>
> On Mon, Aug 30, 2021 at 3:00 PM Lokesh Lingarajan
> <llingara...@confluent.io.invalid> wrote:
>
>> Motivation
>>
>> Today we ingest a number of high cardinality metrics into Druid across
>> dimensions. These metrics are rolled up on a per minute basis, and are
>> very
>> useful when looking at metrics on a partition or client basis. Events is
>> another class of data that provides useful information about a particular
>> incident/scenario inside a Kafka cluster. Events themselves are carried
>> inside the kafka payload, but nonetheless there is some very useful
>> metadata that is carried in kafka headers that can serve as a useful
>> dimension for aggregation and in turn bringing better insights.
>>
>> PR(#10730 <https://github.com/apache/druid/pull/10730>) introduced
>> support
>> for Kafka headers in InputFormats.
>>
>> We still need an input format to parse out the headers and translate those
>> into relevant columns in Druid. Until that’s implemented, none of the
>> information available in the Kafka message headers would be exposed. So
>> first there is a need to implement an input format that can parse headers
>> in any given format(provided we support the format) like we parse payloads
>> today. Apart from headers there is also some useful information present in
>> the key portion of the kafka record. We also need a way to expose the data
>> present in the key as druid columns. We need a generic way to express at
>> configuration time what attributes from headers, key and payload need to
>> be
>> ingested into druid. We need to keep the design generic enough so that
>> users can specify different parsers for headers, key and payload.
>>
>> Proposal is to design an input format to solve the above by providing
>> wrapper around any existing input formats and merging the data into a
>> single unified Druid row.
>> Proposed changes
>>
>> Let's look at a sample input format from the above discussion
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *"inputFormat":{        "type": "kafka", // New input format type
>> "headerLabelPrefix": "kafka.header.", // Label prefix for header columns,
>> this will avoid collisions while merging columns
>> "recordTimestampLabelPrefix": "kafka.", // Kafka record's timestamp is
>> made
>> available in case payload does not carry timestamp        "headerFormat":
>> // Header parser specifying that values are of type string        {
>>       "type": "string"        },       "valueFormat": // Value parser from
>> json parsing       {             "type": "json",
>>  "flattenSpec":
>> {                     "useFieldDiscovery": true,
>> "fields": [...]             }        },        "keyFormat": // Key parser
>> also from json parsing         {             "type": "json"         }}*
>>
>> Since we have independent sections for header, key and payload, it will
>> also enable parsing each section with its own parser, eg., headers coming
>> in as string and payload as json.
>>
>> KafkaInputFormat(the new inputFormat class) will be the uber class
>> extending inputFormat interface and will be responsible for creating
>> individual parsers for header, key and payload, blend the data resolving
>> conflicts in columns and generating a single unified InputRow for Druid
>> ingestion.
>>
>> "headerFormat" will allow users to plug in a parser type for the header
>> values and will add the default header prefix as "kafka.header."(can be
>> overridden) for attributes to avoid collision while merging attributes
>> with
>> payload.
>>
>> Kafka payload parser will be responsible for parsing the Value portion of
>> the Kafka record. This is where most of the data will come from and we
>> should be able to plugin existing parsers. One thing to note here is that
>> if batching is performed, then the code should be augmenting header and
>> key
>> values to every record in the batch.
>>
>> Kafka key parser will handle parsing the Key portion of the Kafka record
>> and will ingest the Key with dimension name as "kafka.key".
>> Operational impact, Test plan & Future work
>>
>> Since we had an immediate need to ingest blended data from header and
>> payload, we have implemented the above proposal in a PR - here
>> <https://github.com/apache/druid/pull/11630>
>> -Lokesh Lingarajan
>>
>

Reply via email to