FrankChen021 commented on code in PR #18525:
URL: https://github.com/apache/druid/pull/18525#discussion_r2460900097
##########
docs/ingestion/kafka-ingestion.md:
##########
@@ -264,6 +265,105 @@ The following example shows a supervisor spec with idle
configuration enabled:
```
</details>
+#### Header-based filtering
+
+Header-based filtering allows you to filter Kafka records based on their
headers before ingestion, reducing the amount of data processed and improving
ingestion performance. This is particularly useful when you want to ingest only
a subset of records from a Kafka topic based on header values.
+
+The following table outlines the configuration options for
`headerBasedFilterConfig`:
+
+|Property|Type|Description|Required|Default|
+|--------|----|-----------|--------|-------|
+|`filter`|Object|A Druid filter specification that defines which records to
include based on header values. Only `in` filters are supported.|Yes||
+|`encoding`|String|The character encoding used to decode header values.
Supported encodings include `UTF-8`, `UTF-16`, `ISO-8859-1`, `US-ASCII`,
`UTF-16BE`, and `UTF-16LE`.|No|`UTF-8`|
+|`stringDecodingCacheSize`|Integer|The maximum number of decoded header
strings to cache in memory. Set to a higher value for better performance when
processing many unique header values.|No|10000|
+
+##### Header-based filtering example
+
+The following example shows how to configure header-based filtering to ingest
only records where the `environment` header has the value `production` or
`staging`:
+
+<details>
+ <summary>Click to view the example</summary>
+
+```json
+{
+ "type": "kafka",
+ "spec": {
+ "dataSchema": {
+ "dataSource": "metrics-kafka",
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "auto"
+ },
+ "dimensionsSpec": {
+ "dimensions": [],
+ "dimensionExclusions": [
+ "timestamp",
+ "value"
+ ]
+ },
+ "metricsSpec": [
+ {
+ "name": "count",
+ "type": "count"
+ },
+ {
+ "name": "value_sum",
+ "fieldName": "value",
+ "type": "doubleSum"
+ }
+ ],
+ "granularitySpec": {
+ "type": "uniform",
+ "segmentGranularity": "HOUR",
+ "queryGranularity": "NONE"
+ }
+ },
+ "ioConfig": {
+ "topic": "metrics",
+ "inputFormat": {
+ "type": "json"
+ },
+ "consumerProperties": {
+ "bootstrap.servers": "localhost:9092"
+ },
+ "headerBasedFilterConfig": {
+ "filter": {
+ "type": "in",
+ "dimension": "environment",
+ "values": ["production", "staging"]
+ },
+ "encoding": "UTF-8",
+ "stringDecodingCacheSize": 10000
+ },
+ "taskCount": 1,
+ "replicas": 1,
+ "taskDuration": "PT1H"
+ },
+ "tuningConfig": {
+ "type": "kafka",
+ "maxRowsPerSegment": 5000000
+ }
+ }
+}
+```
+</details>
+
+In this example:
+- Only records with `environment` header values of `production` or `staging`
will be ingested
Review Comment:
based on the implementation below, the description here is not incomplete.
- records that do not have the environment headers are also included
- records that have environment header but the header value is null are
included
- records that have environment header and value, but fail to decode value,
are also included
But from the `filter` itself semantic, I think these 3 are contradiction. My
understanding is that only records that have environment header and has either
production or staging value will be included, meaning that above records should
NOT be included
correct me if I'm wrong.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]