This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new abd7a9748d Remove kafka lookup records when a record is tombstoned 
(#12819)
abd7a9748d is described below

commit abd7a9748d84b536bcabe44af655c5d37826ae15
Author: Hamish Ball <[email protected]>
AuthorDate: Tue Aug 9 17:12:51 2022 +1200

    Remove kafka lookup records when a record is tombstoned (#12819)
    
    * remove kafka lookup records from factory when record tombstoned
    
    * update kafka lookup docs to include tombstone behaviour
    
    * change test wait time down to 10ms
    
    Co-authored-by: David Palmer <[email protected]>
---
 .../extensions-core/kafka-extraction-namespace.md  |  4 ++
 .../query/lookup/KafkaLookupExtractorFactory.java  | 11 +++-
 .../query/lookup/TestKafkaExtractionCluster.java   | 72 ++++++++++++++++++++++
 3 files changed, 85 insertions(+), 2 deletions(-)

diff --git a/docs/development/extensions-core/kafka-extraction-namespace.md 
b/docs/development/extensions-core/kafka-extraction-namespace.md
index c5f7020be6..2c8a9c293b 100644
--- a/docs/development/extensions-core/kafka-extraction-namespace.md
+++ b/docs/development/extensions-core/kafka-extraction-namespace.md
@@ -49,6 +49,10 @@ The consumer properties `group.id` and `auto.offset.reset` 
CANNOT be set in `kaf
 
 See [lookups](../../querying/lookups.md) for how to configure and use lookups.
 
+## Tombstones and Deleting Records
+
+The Kafka lookup extractor treats `null` Kafka messages as tombstones. This 
means that a record on the input topic with a `null` message payload on Kafka 
will remove the associated key from the lookup map, effectively deleting it.
+
 ## Limitations
 
 Currently the Kafka lookup extractor feeds the entire Kafka stream into a 
local cache. If you are using on-heap caching, this can easily clobber your 
java heap if the Kafka stream spews a lot of unique keys.
diff --git 
a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
 
b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
index f3d2c1eb1c..e0bd3e082c 100644
--- 
a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
+++ 
b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
@@ -176,8 +176,15 @@ public class KafkaLookupExtractorFactory implements 
LookupExtractorFactory
               for (final ConsumerRecord<String, String> record : records) {
                 final String key = record.key();
                 final String message = record.value();
-                if (key == null || message == null) {
-                  LOG.error("Bad key/message from topic [%s]: [%s]", topic, 
record);
+                if (key == null) {
+                  LOG.error("Bad key from topic [%s]: [%s]", topic, record);
+                  continue;
+                }
+                if (message == null) {
+                  LOG.trace("Removed key[%s] val[%s]", key, message);
+                  doubleEventCount.incrementAndGet();
+                  map.remove(key);
+                  doubleEventCount.incrementAndGet();
                   continue;
                 }
                 doubleEventCount.incrementAndGet();
diff --git 
a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java
 
b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java
index 1b74419520..4903cbd3b2 100644
--- 
a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java
+++ 
b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java
@@ -265,6 +265,78 @@ public class TestKafkaExtractionCluster
     }
   }
 
+  @Test(timeout = 60_000L)
+  public void testLookupWithTombstone() throws Exception
+  {
+    try (final Producer<byte[], byte[]> producer = new 
KafkaProducer(makeProducerProperties())) {
+      checkServer();
+
+      assertUpdated(null, "foo");
+      assertReverseUpdated(ImmutableList.of(), "foo");
+
+      long events = factory.getCompletedEventCount();
+
+      log.info("-------------------------     Sending foo bar     
-------------------------------");
+      producer.send(new ProducerRecord<>(TOPIC_NAME, 
StringUtils.toUtf8("foo"), StringUtils.toUtf8("bar")));
+
+      long start = System.currentTimeMillis();
+      while (events == factory.getCompletedEventCount()) {
+        Thread.sleep(10);
+        if (System.currentTimeMillis() > start + 60_000) {
+          throw new ISE("Took too long to update event");
+        }
+      }
+
+      log.info("-------------------------     Checking foo bar     
-------------------------------");
+      assertUpdated("bar", "foo");
+      assertReverseUpdated(Collections.singletonList("foo"), "bar");
+
+      checkServer();
+      events = factory.getCompletedEventCount();
+
+      log.info("-----------------------     Sending foo tombstone     
-----------------------------");
+      producer.send(new ProducerRecord<>(TOPIC_NAME, 
StringUtils.toUtf8("foo"), null));
+      while (events == factory.getCompletedEventCount()) {
+        Thread.sleep(10);
+        if (System.currentTimeMillis() > start + 60_000) {
+          throw new ISE("Took too long to update event");
+        }
+      }
+
+      log.info("-----------------------     Checking foo removed     
-----------------------------");
+      assertUpdated(null, "foo");
+      assertReverseUpdated(ImmutableList.of(), "foo");
+    }
+  }
+
+  @Test(timeout = 60_000L)
+  public void testLookupWithInitTombstone() throws Exception
+  {
+    try (final Producer<byte[], byte[]> producer = new 
KafkaProducer(makeProducerProperties())) {
+      checkServer();
+
+      assertUpdated(null, "foo");
+      assertReverseUpdated(ImmutableList.of(), "foo");
+
+      long events = factory.getCompletedEventCount();
+
+      long start = System.currentTimeMillis();
+
+      log.info("-----------------------     Sending foo tombstone     
-----------------------------");
+      producer.send(new ProducerRecord<>(TOPIC_NAME, 
StringUtils.toUtf8("foo"), null));
+      while (events == factory.getCompletedEventCount()) {
+        Thread.sleep(10);
+        if (System.currentTimeMillis() > start + 60_000) {
+          throw new ISE("Took too long to update event");
+        }
+      }
+
+      log.info("-----------------------     Checking foo removed     
-----------------------------");
+      assertUpdated(null, "foo");
+      assertReverseUpdated(ImmutableList.of(), "foo");
+    }
+  }
+
   private void assertUpdated(
       String expected,
       String key


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to