This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2855fb6ff8 Change Kafka Lookup Extractor to not register consumer
group (#12842)
2855fb6ff8 is described below
commit 2855fb6ff8181d83fa679db26c8bbddf839391fd
Author: David Palmer <[email protected]>
AuthorDate: Tue Aug 9 22:44:22 2022 +1200
Change Kafka Lookup Extractor to not register consumer group (#12842)
* change kafka lookups module to not commit offsets
The current behaviour of the Kafka lookup extractor is to not commit
offsets by assigning a unique ID to the consumer group and setting
auto.offset.reset to earliest. This does the job but also pollutes the
Kafka broker with a bunch of "ghost" consumer groups that will never again
be
used.
To fix this, we now set enable.auto.commit to false, which prevents the
ghost consumer groups being created in the first place.
* update docs to include new enable.auto.commit setting behaviour
* update kafka-lookup-extractor documentation
Provide some additional detail on functionality and configuration.
Hopefully this will make it clearer how the extractor works for
developers who aren't so familiar with Kafka.
* add comments better explaining the logic of the code
* add spelling exceptions for kafka lookup docs
---
.../extensions-core/kafka-extraction-namespace.md | 54 ++++++++++++++++------
.../query/lookup/KafkaLookupExtractorFactory.java | 19 +++++++-
.../lookup/KafkaLookupExtractorFactoryTest.java | 16 +++++++
website/.spelling | 8 ++++
4 files changed, 82 insertions(+), 15 deletions(-)
diff --git a/docs/development/extensions-core/kafka-extraction-namespace.md
b/docs/development/extensions-core/kafka-extraction-namespace.md
index 2c8a9c293b..93e6858ca4 100644
--- a/docs/development/extensions-core/kafka-extraction-namespace.md
+++ b/docs/development/extensions-core/kafka-extraction-namespace.md
@@ -32,22 +32,50 @@ If you need updates to populate as promptly as possible, it
is possible to plug
{
"type":"kafka",
"kafkaTopic":"testTopic",
- "kafkaProperties":{"zookeeper.connect":"somehost:2181/kafka"}
+ "kafkaProperties":{
+ "bootstrap.servers":"kafka.service:9092"
+ }
}
```
-|Parameter|Description|Required|Default|
-|---------|-----------|--------|-------|
-|`kafkaTopic`|The Kafka topic to read the data from|Yes||
-|`kafkaProperties`|Kafka consumer properties. At least"zookeeper.connect" must
be specified. Only the zookeeper connector is supported|Yes||
-|`connectTimeout`|How long to wait for an initial connection|No|`0` (do not
wait)|
-|`isOneToOne`|The map is a one-to-one (see [Lookup
DimensionSpecs](../../querying/dimensionspecs.md))|No|`false`|
+| Parameter | Description
| Required | Default |
+|-------------------|-----------------------------------------------------------------------------------------|----------|-------------------|
+| `kafkaTopic` | The Kafka topic to read the data from
| Yes ||
+| `kafkaProperties` | Kafka consumer properties (`bootstrap.servers` must be
specified) | Yes ||
+| `connectTimeout` | How long to wait for an initial connection
| No | `0` (do not wait) |
+| `isOneToOne` | The map is a one-to-one (see [Lookup
DimensionSpecs](../../querying/dimensionspecs.md)) | No | `false`
|
-The extension `kafka-extraction-namespace` enables reading from a Kafka feed
which has name/key pairs to allow renaming of dimension values. An example use
case would be to rename an ID to a human readable format.
+The extension `kafka-extraction-namespace` enables reading from an [Apache
Kafka](https://kafka.apache.org/) topic which has name/key pairs to allow
renaming of dimension values. An example use case would be to rename an ID to a
human-readable format.
-The consumer properties `group.id` and `auto.offset.reset` CANNOT be set in
`kafkaProperties` as they are set by the extension as
`UUID.randomUUID().toString()` and `smallest` respectively.
+## How it Works
-See [lookups](../../querying/lookups.md) for how to configure and use lookups.
+The extractor works by consuming the configured Kafka topic from the
beginning, and appending every record to an internal map. The key of the Kafka
record is used as they key of the map, and the payload of the record is used as
the value. At query time, a lookup can be used to transform the key into the
associated value. See [lookups](../../querying/lookups.md) for how to configure
and use lookups in a query. Keys and values are both stored as strings by the
lookup extractor.
+
+The extractor remains subscribed to the topic, so new records are added to the
lookup map as they appear. This allows for lookup values to be updated in
near-realtime. If two records are added to the topic with the same key, the
record with the larger offset will replace the previous record in the lookup
map. A record with a `null` payload will be treated as a tombstone record, and
the associated key will be removed from the lookup map.
+
+The extractor treats the input topic much like a
[KTable](https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/KTable.html).
As such, it is best to create your Kafka topic using a [log
compaction](https://kafka.apache.org/documentation/#compaction) strategy, so
that the most-recent version of a key is always preserved in Kafka. Without
properly configuring retention and log compaction, older keys that are
automatically removed from Kafka will not be available and will be [...]
+
+### Example
+
+Consider a `country_codes` topic is being consumed, and the following records
are added to the topic in the following order:
+
+| Offset | Key | Payload |
+|--------|-----|-------------|
+| 1 | NZ | Nu Zeelund |
+| 2 | AU | Australia |
+| 3 | NZ | New Zealand |
+| 4 | AU | `null` |
+| 5 | NZ | Aotearoa |
+| 6 | CZ | Czechia |
+
+This input topic would be consumed from the beginning, and result in a lookup
namespace containing the following mappings (notice that the entry for
_Australia_ was added and then deleted):
+
+| Key | Value |
+|-----|-----------|
+| NZ | Aotearoa |
+| CZ | Czechia |
+
+Now when a query uses this extraction namespace, the country codes can be
mapped to the full country name at query time.
## Tombstones and Deleting Records
@@ -55,9 +83,9 @@ The Kafka lookup extractor treats `null` Kafka messages as
tombstones. This mean
## Limitations
-Currently the Kafka lookup extractor feeds the entire Kafka stream into a
local cache. If you are using on-heap caching, this can easily clobber your
java heap if the Kafka stream spews a lot of unique keys.
-off-heap caching should alleviate these concerns, but there is still a limit
to the quantity of data that can be stored.
-There is currently no eviction policy.
+The consumer properties `group.id`, `auto.offset.reset` and
`enable.auto.commit` cannot be set in `kafkaProperties` as they are set by the
extension as `UUID.randomUUID().toString()`, `earliest` and `false`
respectively. This is because the entire topic must be consumed by the Druid
service from the very beginning so that a complete map of lookup values can be
built. Setting any of these consumer properties will cause the extractor to not
start.
+
+Currently, the Kafka lookup extractor feeds the entire Kafka topic into a
local cache. If you are using on-heap caching, this can easily clobber your
java heap if the Kafka stream spews a lot of unique keys. Off-heap caching
should alleviate these concerns, but there is still a limit to the quantity of
data that can be stored. There is currently no eviction policy.
## Testing the Kafka rename functionality
diff --git
a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
index e0bd3e082c..6d5d393a0b 100644
---
a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
+++
b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
@@ -357,6 +357,13 @@ public class KafkaLookupExtractorFactory implements
LookupExtractorFactory
return future;
}
+ /**
+ * Check that the user has not set forbidden Kafka consumer props
+ *
+ * Some consumer properties must be set in order to guarantee that
+ * the consumer will consume the entire topic from the beginning.
+ * Otherwise, lookup data may not be loaded completely.
+ */
private void verifyKafkaProperties()
{
if (kafkaProperties.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
@@ -367,10 +374,17 @@ public class KafkaLookupExtractorFactory implements
LookupExtractorFactory
}
if (kafkaProperties.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
throw new IAE(
- "Cannot set kafka property [auto.offset.reset]. Property will be
forced to [smallest]. Found [%s]",
+ "Cannot set kafka property [auto.offset.reset]. Property will be
forced to [earliest]. Found [%s]",
kafkaProperties.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
);
}
+ if (kafkaProperties.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
&&
+
!kafkaProperties.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).equals("false"))
{
+ throw new IAE(
+ "Cannot set kafka property [enable.auto.commit]. Property will be
forced to [false]. Found [%s]",
+ kafkaProperties.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
+ );
+ }
Preconditions.checkNotNull(
kafkaProperties.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
"bootstrap.servers required property"
@@ -398,9 +412,10 @@ public class KafkaLookupExtractorFactory implements
LookupExtractorFactory
{
final Properties properties = new Properties();
properties.putAll(kafkaProperties);
- // Enable publish-subscribe
+ // Set the consumer to consume everything and never commit offsets
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, factoryId);
+ properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return properties;
}
}
diff --git
a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
index 75dba4d442..6c21990bb5 100644
---
a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
+++
b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
@@ -398,6 +398,22 @@ public class KafkaLookupExtractorFactoryTest
Assert.assertTrue(factory.close());
}
+ @Test
+ public void testStartFailsOnAutoCommit()
+ {
+ final KafkaLookupExtractorFactory factory = new
KafkaLookupExtractorFactory(
+ cacheManager,
+ TOPIC,
+ ImmutableMap.of("enable.auto.commit", "true")
+ );
+ Assert.assertThrows(
+ "Cannot set kafka property [enable.auto.commit]. Property will be
forced to [false]. Found [true]",
+ IAE.class,
+ () -> factory.start()
+ );
+ Assert.assertTrue(factory.close());
+ }
+
@Test
public void testFailsGetNotStarted()
{
diff --git a/website/.spelling b/website/.spelling
index 088d27f4d0..52ed82098c 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -883,7 +883,11 @@ gcs-connector
hadoop2
hdfs
- ../docs/development/extensions-core/kafka-extraction-namespace.md
+Aotearoa
+Czechia
+KTable
LookupExtractorFactory
+Zeelund
zookeeper.connect
- ../docs/development/extensions-core/kafka-ingestion.md
0.11.x.
@@ -2110,3 +2114,7 @@ TIMESTAMPDIFF
TRUNC
VAR_POP
VAR_SAMP
+KTable
+Aotearoa
+Czechia
+Zeelund
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]