lokesh-lingarajan opened a new pull request #11630:
URL: https://github.com/apache/druid/pull/11630


   ### Description
   
   Today we ingest a number of high cardinality metrics into Druid across 
dimensions such as tenant, user, TopicPartition, client ID and more. These 
metrics are rolled up on a per minute basis, and are very useful when looking 
at metrics on a partition or client basis. Events is another class of data that 
provides useful information about a particular incident/scenario inside a Kafka 
cluster. Events themselves are carried inside kafka payload, but nonetheless 
there are some very useful metadata that is carried in kafka headers that can 
serve as useful dimension for aggregation and in turn bringing better insights.
   
   PR(https://github.com/apache/druid/pull/10730) introduced support of Kafka 
headers in InputFormats.
   
   We still need an input format to parse out the headers and translate those 
into relevant columns in Druid. Until that’s implemented, none of the 
information available in the Kafka message headers would be exposed. So first 
there is a need to write an input format that can parse headers in any given 
format(provided we support the format) like we parse payloads today. Apart from 
headers there is also some useful information present in the key portion of the 
kafka record. We also need a way to expose the data present in the key as druid 
columns. We need a generic way to express at configuration time what attributes 
from headers, key and payload need to be ingested into druid. We need to keep 
the design generic enough so that users can specify different parsers for 
headers, key and payload.
   
   This PR is designed to solve the above by providing wrapper around any 
existing input formats and merging the data into a single unified Druid row.
   
   Lets look at a sample input format from the above discussion
   
   "inputFormat":
   {
       "type": "kafka",     // New input format type
       "headerLabelPrefix": "kafka.header.",   // Label prefix for header 
columns, this will avoid collusions while merging columns
       "recordTimestampLabelPrefix": "kafka.",  // Kafka record's timestamp is 
made available in case payload does not carry timestamp
       "headerFormat":  // Header parser specifying that values are of type 
string
       {
           "type": "string"
       },
       "valueFormat": // Value parser from json parsing
       {
           "type": "json",
           "flattenSpec": {
             "useFieldDiscovery": true,
             "fields": [...]
           }
       },
       "keyFormat":  // Key parser also from json parsing
       {
           "type": "json"
       }
   }
   
   Since we have independent sections for header, key and payload, it will 
enable parsing each section with its own parser, eg., headers coming in as 
string and payload as json. 
   
   KafkaInputFormat will be the uber class extending inputFormat interface and 
will be responsible for creating individual parsers for header, key and 
payload, blend the data resolving conflicts in columns and generating a single 
unified InputRow for Druid ingestion. 
   
   "headerFormat" will allow users to plug parser type for the header values 
and will add default header prefix as "kafka.header."(can be overridden) for 
attributes to avoid collision while merging attributes with payload.
   
   Kafka payload parser will be responsible for parsing the Value portion of 
the Kafka record. This is where most of the data will come from and we should 
be able to plugin existing parser. One thing to note here is that if batching 
is performed, then the code is augmenting header and key values to every record 
in the batch.
   
   Kafka key parser will handle parsing Key portion of the Kafka record and 
will ingest the Key with dimension name as "kafka.key".
   
   ## KafkaInputFormat Class: 
   This is the class that orchestrates sending the consumerRecord to each 
parser, retrieve rows, merge the columns into one final row for Druid 
consumption. KafkaInputformat should make sure to release the resources that 
gets allocated as a part of reader in CloseableIterator<InputRow> during normal 
and exception cases.
   
   During conflicts in dimension/metrics names, the code will prefer dimension 
names from payload and ignore the dimension either from headers/key. This is 
done so that existing input formats can be easily migrated to this new format 
without worrying about losing information.
   
   This code has been tested locally(mac laptop) with production data and 
following are the performance numbers
   Vanilla payload parser using json - 19 million records ingested/hour
   New kafka input format with header parsing - 17 million records ingested/hour
   
   Currently this code has been deployed in lab/stage environments and 
ingesting over 100 million records/day.
   
   @xvrl 
   
   This PR has:
   - [x ] been self-reviewed.
    - [x] added documentation for new or modified features or behaviors.
   - [x] added Javadocs for most classes and all non-trivial methods. Linked 
related entities via Javadoc links.
   - [x] added comments explaining the "why" and the intent of the code 
wherever would not be obvious for an unfamiliar reader.
   - [x] added unit tests or modified existing tests to cover new code paths, 
ensuring the threshold for [code 
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
 is met.
   - [x] added integration tests.
   - [x] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to