This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new abd7a9748d Remove kafka lookup records when a record is tombstoned
(#12819)
abd7a9748d is described below
commit abd7a9748d84b536bcabe44af655c5d37826ae15
Author: Hamish Ball <[email protected]>
AuthorDate: Tue Aug 9 17:12:51 2022 +1200
Remove kafka lookup records when a record is tombstoned (#12819)
* remove kafka lookup records from factory when record tombstoned
* update kafka lookup docs to include tombstone behaviour
* change test wait time down to 10ms
Co-authored-by: David Palmer <[email protected]>
---
.../extensions-core/kafka-extraction-namespace.md | 4 ++
.../query/lookup/KafkaLookupExtractorFactory.java | 11 +++-
.../query/lookup/TestKafkaExtractionCluster.java | 72 ++++++++++++++++++++++
3 files changed, 85 insertions(+), 2 deletions(-)
diff --git a/docs/development/extensions-core/kafka-extraction-namespace.md
b/docs/development/extensions-core/kafka-extraction-namespace.md
index c5f7020be6..2c8a9c293b 100644
--- a/docs/development/extensions-core/kafka-extraction-namespace.md
+++ b/docs/development/extensions-core/kafka-extraction-namespace.md
@@ -49,6 +49,10 @@ The consumer properties `group.id` and `auto.offset.reset`
CANNOT be set in `kaf
See [lookups](../../querying/lookups.md) for how to configure and use lookups.
+## Tombstones and Deleting Records
+
+The Kafka lookup extractor treats `null` Kafka messages as tombstones. This
means that a record on the input topic with a `null` message payload on Kafka
will remove the associated key from the lookup map, effectively deleting it.
+
## Limitations
Currently the Kafka lookup extractor feeds the entire Kafka stream into a
local cache. If you are using on-heap caching, this can easily clobber your
java heap if the Kafka stream spews a lot of unique keys.
diff --git
a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
index f3d2c1eb1c..e0bd3e082c 100644
---
a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
+++
b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java
@@ -176,8 +176,15 @@ public class KafkaLookupExtractorFactory implements
LookupExtractorFactory
for (final ConsumerRecord<String, String> record : records) {
final String key = record.key();
final String message = record.value();
- if (key == null || message == null) {
- LOG.error("Bad key/message from topic [%s]: [%s]", topic,
record);
+ if (key == null) {
+ LOG.error("Bad key from topic [%s]: [%s]", topic, record);
+ continue;
+ }
+ if (message == null) {
+ LOG.trace("Removed key[%s] val[%s]", key, message);
+ doubleEventCount.incrementAndGet();
+ map.remove(key);
+ doubleEventCount.incrementAndGet();
continue;
}
doubleEventCount.incrementAndGet();
diff --git
a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java
b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java
index 1b74419520..4903cbd3b2 100644
---
a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java
+++
b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java
@@ -265,6 +265,78 @@ public class TestKafkaExtractionCluster
}
}
+ @Test(timeout = 60_000L)
+ public void testLookupWithTombstone() throws Exception
+ {
+ try (final Producer<byte[], byte[]> producer = new
KafkaProducer(makeProducerProperties())) {
+ checkServer();
+
+ assertUpdated(null, "foo");
+ assertReverseUpdated(ImmutableList.of(), "foo");
+
+ long events = factory.getCompletedEventCount();
+
+ log.info("------------------------- Sending foo bar
-------------------------------");
+ producer.send(new ProducerRecord<>(TOPIC_NAME,
StringUtils.toUtf8("foo"), StringUtils.toUtf8("bar")));
+
+ long start = System.currentTimeMillis();
+ while (events == factory.getCompletedEventCount()) {
+ Thread.sleep(10);
+ if (System.currentTimeMillis() > start + 60_000) {
+ throw new ISE("Took too long to update event");
+ }
+ }
+
+ log.info("------------------------- Checking foo bar
-------------------------------");
+ assertUpdated("bar", "foo");
+ assertReverseUpdated(Collections.singletonList("foo"), "bar");
+
+ checkServer();
+ events = factory.getCompletedEventCount();
+
+ log.info("----------------------- Sending foo tombstone
-----------------------------");
+ producer.send(new ProducerRecord<>(TOPIC_NAME,
StringUtils.toUtf8("foo"), null));
+ while (events == factory.getCompletedEventCount()) {
+ Thread.sleep(10);
+ if (System.currentTimeMillis() > start + 60_000) {
+ throw new ISE("Took too long to update event");
+ }
+ }
+
+ log.info("----------------------- Checking foo removed
-----------------------------");
+ assertUpdated(null, "foo");
+ assertReverseUpdated(ImmutableList.of(), "foo");
+ }
+ }
+
+ @Test(timeout = 60_000L)
+ public void testLookupWithInitTombstone() throws Exception
+ {
+ try (final Producer<byte[], byte[]> producer = new
KafkaProducer(makeProducerProperties())) {
+ checkServer();
+
+ assertUpdated(null, "foo");
+ assertReverseUpdated(ImmutableList.of(), "foo");
+
+ long events = factory.getCompletedEventCount();
+
+ long start = System.currentTimeMillis();
+
+ log.info("----------------------- Sending foo tombstone
-----------------------------");
+ producer.send(new ProducerRecord<>(TOPIC_NAME,
StringUtils.toUtf8("foo"), null));
+ while (events == factory.getCompletedEventCount()) {
+ Thread.sleep(10);
+ if (System.currentTimeMillis() > start + 60_000) {
+ throw new ISE("Took too long to update event");
+ }
+ }
+
+ log.info("----------------------- Checking foo removed
-----------------------------");
+ assertUpdated(null, "foo");
+ assertReverseUpdated(ImmutableList.of(), "foo");
+ }
+ }
+
private void assertUpdated(
String expected,
String key
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]