This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2855fb6ff8 Change Kafka Lookup Extractor to not register consumer 
group (#12842)
2855fb6ff8 is described below

commit 2855fb6ff8181d83fa679db26c8bbddf839391fd
Author: David Palmer <[email protected]>
AuthorDate: Tue Aug 9 22:44:22 2022 +1200

    Change Kafka Lookup Extractor to not register consumer group (#12842)
    
    * change kafka lookups module to not commit offsets
    
    The current behaviour of the Kafka lookup extractor is to not commit
    offsets by assigning a unique ID to the consumer group and setting
    auto.offset.reset to earliest. This does the job but also pollutes the
    Kafka broker with a bunch of "ghost" consumer groups that will never again 
be
    used.
    
    To fix this, we now set enable.auto.commit to false, which prevents the
    ghost consumer groups being created in the first place.
    
    * update docs to include new enable.auto.commit setting behaviour
    
    * update kafka-lookup-extractor documentation
    
    Provide some additional detail on functionality and configuration.
    Hopefully this will make it clearer how the extractor works for
    developers who aren't so familiar with Kafka.
    
    * add comments better explaining the logic of the code
    
    * add spelling exceptions for kafka lookup docs
---
 .../extensions-core/kafka-extraction-namespace.md  | 54 ++++++++++++++++------
 .../query/lookup/KafkaLookupExtractorFactory.java  | 19 +++++++-
 .../lookup/KafkaLookupExtractorFactoryTest.java    | 16 +++++++
 website/.spelling                                  |  8 ++++
 4 files changed, 82 insertions(+), 15 deletions(-)

diff --git a/docs/development/extensions-core/kafka-extraction-namespace.md 
b/docs/development/extensions-core/kafka-extraction-namespace.md
index 2c8a9c293b..93e6858ca4 100644
--- a/docs/development/extensions-core/kafka-extraction-namespace.md
+++ b/docs/development/extensions-core/kafka-extraction-namespace.md
@@ -32,22 +32,50 @@ If you need updates to populate as promptly as possible, it 
is possible to plug
 {
   "type":"kafka",
   "kafkaTopic":"testTopic",
-  "kafkaProperties":{"zookeeper.connect":"somehost:2181/kafka"}
+  "kafkaProperties":{
+    "bootstrap.servers":"kafka.service:9092"
+  }
 }
 ```
 
-|Parameter|Description|Required|Default|
-|---------|-----------|--------|-------|
-|`kafkaTopic`|The Kafka topic to read the data from|Yes||
-|`kafkaProperties`|Kafka consumer properties. At least"zookeeper.connect" must 
be specified. Only the zookeeper connector is supported|Yes||
-|`connectTimeout`|How long to wait for an initial connection|No|`0` (do not 
wait)|
-|`isOneToOne`|The map is a one-to-one (see [Lookup 
DimensionSpecs](../../querying/dimensionspecs.md))|No|`false`|
+| Parameter         | Description                                              
                               | Required | Default           |
+|-------------------|-----------------------------------------------------------------------------------------|----------|-------------------|
+| `kafkaTopic`      | The Kafka topic to read the data from                    
                               | Yes      ||
+| `kafkaProperties` | Kafka consumer properties (`bootstrap.servers` must be 
specified)                       | Yes      ||
+| `connectTimeout`  | How long to wait for an initial connection               
                               | No       | `0` (do not wait) |
+| `isOneToOne`      | The map is a one-to-one (see [Lookup 
DimensionSpecs](../../querying/dimensionspecs.md)) | No       | `false`         
  |
 
-The extension `kafka-extraction-namespace` enables reading from a Kafka feed 
which has name/key pairs to allow renaming of dimension values. An example use 
case would be to rename an ID to a human readable format.
+The extension `kafka-extraction-namespace` enables reading from an [Apache 
Kafka](https://kafka.apache.org/) topic which has name/key pairs to allow 
renaming of dimension values. An example use case would be to rename an ID to a 
human-readable format.
 
-The consumer properties `group.id` and `auto.offset.reset` CANNOT be set in 
`kafkaProperties` as they are set by the extension as 
`UUID.randomUUID().toString()` and `smallest` respectively.
+## How it Works
 
-See [lookups](../../querying/lookups.md) for how to configure and use lookups.
+The extractor works by consuming the configured Kafka topic from the 
beginning, and appending every record to an internal map. The key of the Kafka 
record is used as they key of the map, and the payload of the record is used as 
the value. At query time, a lookup can be used to transform the key into the 
associated value. See [lookups](../../querying/lookups.md) for how to configure 
and use lookups in a query. Keys and values are both stored as strings by the 
lookup extractor.
+
+The extractor remains subscribed to the topic, so new records are added to the 
lookup map as they appear. This allows for lookup values to be updated in 
near-realtime. If two records are added to the topic with the same key, the 
record with the larger offset will replace the previous record in the lookup 
map. A record with a `null` payload will be treated as a tombstone record, and 
the associated key will be removed from the lookup map.
+
+The extractor treats the input topic much like a 
[KTable](https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/KTable.html).
 As such, it is best to create your Kafka topic using a [log 
compaction](https://kafka.apache.org/documentation/#compaction) strategy, so 
that the most-recent version of a key is always preserved in Kafka. Without 
properly configuring retention and log compaction, older keys that are 
automatically removed from Kafka will not be available and will be  [...]
+
+### Example
+
+Consider a `country_codes` topic is being consumed, and the following records 
are added to the topic in the following order:
+
+| Offset | Key | Payload     |
+|--------|-----|-------------|
+| 1      | NZ  | Nu Zeelund  |
+| 2      | AU  | Australia   |
+| 3      | NZ  | New Zealand |
+| 4      | AU  | `null`      |
+| 5      | NZ  | Aotearoa    |
+| 6      | CZ  | Czechia     |
+
+This input topic would be consumed from the beginning, and result in a lookup 
namespace containing the following mappings (notice that the entry for 
_Australia_ was added and then deleted):
+
+| Key | Value     |
+|-----|-----------|
+| NZ  | Aotearoa  |
+| CZ  | Czechia   |
+
+Now when a query uses this extraction namespace, the country codes can be 
mapped to the full country name at query time.
 
 ## Tombstones and Deleting Records
 
@@ -55,9 +83,9 @@ The Kafka lookup extractor treats `null` Kafka messages as 
tombstones. This mean
 
 ## Limitations
 
-Currently the Kafka lookup extractor feeds the entire Kafka stream into a 
local cache. If you are using on-heap caching, this can easily clobber your 
java heap if the Kafka stream spews a lot of unique keys.
-off-heap caching should alleviate these concerns, but there is still a limit 
to the quantity of data that can be stored.
-There is currently no eviction policy.
+The consumer properties `group.id`, `auto.offset.reset` and 
`enable.auto.commit` cannot be set in `kafkaProperties` as they are set by the 
extension as `UUID.randomUUID().toString()`, `earliest` and `false` 
respectively. This is because the entire topic must be consumed by the Druid 
service from the very beginning so that a complete map of lookup values can be 
built. Setting any of these consumer properties will cause the extractor to not 
start.
+
+Currently, the Kafka lookup extractor feeds the entire Kafka topic into a 
local cache. If you are using on-heap caching, this can easily clobber your 
java heap if the Kafka stream spews a lot of unique keys. Off-heap caching 
should alleviate these concerns, but there is still a limit to the quantity of 
data that can be stored.  There is currently no eviction policy.
 
 ## Testing the Kafka rename functionality
 
diff --git 
a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
 
b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
index e0bd3e082c..6d5d393a0b 100644
--- 
a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
+++ 
b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
@@ -357,6 +357,13 @@ public class KafkaLookupExtractorFactory implements 
LookupExtractorFactory
     return future;
   }
 
+  /**
+   * Check that the user has not set forbidden Kafka consumer props
+   *
+   * Some consumer properties must be set in order to guarantee that
+   * the consumer will consume the entire topic from the beginning.
+   * Otherwise, lookup data may not be loaded completely.
+   */
   private void verifyKafkaProperties()
   {
     if (kafkaProperties.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
@@ -367,10 +374,17 @@ public class KafkaLookupExtractorFactory implements 
LookupExtractorFactory
     }
     if (kafkaProperties.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
       throw new IAE(
-              "Cannot set kafka property [auto.offset.reset]. Property will be 
forced to [smallest]. Found [%s]",
+              "Cannot set kafka property [auto.offset.reset]. Property will be 
forced to [earliest]. Found [%s]",
               kafkaProperties.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
       );
     }
+    if (kafkaProperties.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) 
&&
+          
!kafkaProperties.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).equals("false")) 
{
+      throw new IAE(
+          "Cannot set kafka property [enable.auto.commit]. Property will be 
forced to [false]. Found [%s]",
+          kafkaProperties.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
+      );
+    }
     Preconditions.checkNotNull(
             kafkaProperties.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
             "bootstrap.servers required property"
@@ -398,9 +412,10 @@ public class KafkaLookupExtractorFactory implements 
LookupExtractorFactory
   {
     final Properties properties = new Properties();
     properties.putAll(kafkaProperties);
-    // Enable publish-subscribe
+    // Set the consumer to consume everything and never commit offsets
     properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
     properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, factoryId);
+    properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
     return properties;
   }
 }
diff --git 
a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
 
b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
index 75dba4d442..6c21990bb5 100644
--- 
a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
+++ 
b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
@@ -398,6 +398,22 @@ public class KafkaLookupExtractorFactoryTest
     Assert.assertTrue(factory.close());
   }
 
+  @Test
+  public void testStartFailsOnAutoCommit()
+  {
+    final KafkaLookupExtractorFactory factory = new 
KafkaLookupExtractorFactory(
+        cacheManager,
+        TOPIC,
+        ImmutableMap.of("enable.auto.commit", "true")
+    );
+    Assert.assertThrows(
+        "Cannot set kafka property [enable.auto.commit]. Property will be 
forced to [false]. Found [true]",
+        IAE.class,
+        () -> factory.start()
+    );
+    Assert.assertTrue(factory.close());
+  }
+
   @Test
   public void testFailsGetNotStarted()
   {
diff --git a/website/.spelling b/website/.spelling
index 088d27f4d0..52ed82098c 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -883,7 +883,11 @@ gcs-connector
 hadoop2
 hdfs
  - ../docs/development/extensions-core/kafka-extraction-namespace.md
+Aotearoa
+Czechia
+KTable
 LookupExtractorFactory
+Zeelund
 zookeeper.connect
  - ../docs/development/extensions-core/kafka-ingestion.md
 0.11.x.
@@ -2110,3 +2114,7 @@ TIMESTAMPDIFF
 TRUNC 
 VAR_POP 
 VAR_SAMP
+KTable
+Aotearoa
+Czechia
+Zeelund
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to