Repository: metron Updated Branches: refs/heads/master cbdaee174 -> 7af11b626
METRON-1656 Create KAKFA_SEEK function (nickwallen) closes apache/metron#1097 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/7af11b62 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/7af11b62 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/7af11b62 Branch: refs/heads/master Commit: 7af11b626e2d30aaaf7c01c364295b0b407fae49 Parents: cbdaee1 Author: nickwallen <n...@nickallen.org> Authored: Wed Jul 11 11:10:06 2018 -0400 Committer: nickallen <nickal...@apache.org> Committed: Wed Jul 11 11:10:06 2018 -0400 ---------------------------------------------------------------------- .../metron/management/KafkaFunctions.java | 109 +++++++++++++++++++ .../KafkaFunctionsIntegrationTest.java | 92 ++++++++++++++++ 2 files changed, 201 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/7af11b62/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java index 7c9c23f..76418b6 100644 --- a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java +++ b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java @@ -47,6 +47,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -763,6 +764,114 @@ public class KafkaFunctions { } /** + * KAFKA_SEEK + * + * <p>Seeks to a specific offset and returns the message. + * + * <p>Example: Find the message in 'topic', partition 2, offset 1001. + * <pre> + * {@code + * KAFKA_SEEK('topic', 1, 1001) + * } + * </pre> + * + * <p>By default, only the message value is returned. By setting the global property + * 'stellar.kafka.message.view' = 'rich' the function will return additional Kafka metadata + * including the topic, partition, offset, key, and timestamp contained in a map. Setting + * this property value to 'simple' or simply not setting the property value, will result + * in the default view behavior. + */ + @Stellar( + namespace = "KAFKA", + name = "SEEK", + description = "Seeks to an offset within a topic and returns the message.", + params = { + "topic - The name of the Kafka topic", + "partition - The partition identifier; starts at 0.", + "offset - The offset within the partition; starts at 0.", + "config - Optional map of key/values that override any global properties." + }, + returns = "The message at the given offset, if the offset exists. Otherwise, returns null." + ) + public static class KafkaSeek implements StellarFunction { + + @Override + public Object apply(List<Object> args, Context context) throws ParseException { + // required - the topic, partition, and offset are all required + String topic = getArg("topic", 0, String.class, args); + int partition = getArg("partition", 1, Integer.class, args); + int offset = getArg("offset", 2, Integer.class, args); + + // optional - property overrides provided by the user + Map<String, String> overrides = new HashMap<>(); + if(args.size() > 3) { + overrides = getArg("overrides", 3, Map.class, args); + } + + Properties properties = buildKafkaProperties(overrides, context); + return seek(topic, partition, offset, properties); + } + + /** + * Find messages in Kafka that satisfy a filter expression. + * + * @param topic The kafka topic. + * @param partition The partition identifier. + * @param offset The offset within the given partition. + * @param properties Function configuration values. + * @return A list of messages that satisfy the filter expression. + */ + private Object seek(String topic, int partition, int offset, Properties properties) { + final int pollTimeout = getPollTimeout(properties); + final int maxWait = getMaxWait(properties); + Object message = null; + try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) { + + // continue until we have the message or exceeded the max wait time + long wait = 0L; + final long start = clock.currentTimeMillis(); + while(message == null && wait < maxWait) { + + // seek to the offset + TopicPartition topar = new TopicPartition(topic, partition); + consumer.assign(Collections.singletonList(topar)); + consumer.seek(topar, offset); + + // poll kafka for messages + for(ConsumerRecord<String, String> record : consumer.poll(pollTimeout)) { + + // kafka will attempt to be helpful and return a message, even if the actual offset does not exist + if(record.offset() == offset && record.partition() == partition) { + LOG.debug("KAFKA_SEEK found message; topic={}, partition={}, offset={}", topic, partition, offset); + message = render(record, properties); + } + } + + // how long have we waited? + wait = clock.currentTimeMillis() - start; + if(LOG.isDebugEnabled() && message == null) { + LOG.debug("KAFKA_SEEK no message yet; topic={}, partition={}, offset={}, waitTime={} ms", + topic, partition, offset, wait); + } + } + } + + return message; + } + + @Override + public void initialize(Context context) { + // no initialization required + } + + @Override + public boolean isInitialized() { + // no initialization required + return true; + } + } + + /** * Renders the Kafka record into a view. * * <p>A user can customize the way in which a Kafka record is rendered by altering http://git-wip-us.apache.org/repos/asf/metron/blob/7af11b62/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java index 5e045ad..09bce17 100644 --- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java +++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java @@ -113,6 +113,7 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { .withClass(KafkaFunctions.KafkaProps.class) .withClass(KafkaFunctions.KafkaTail.class) .withClass(KafkaFunctions.KafkaFind.class) + .withClass(KafkaFunctions.KafkaSeek.class) .withClass(MapFunctions.MapGet.class); } @@ -593,6 +594,97 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { } /** + * KAFKA_SEEK should return the message at a given offset. + */ + @Test + public void testKafkaSeek() throws Exception { + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // put 3 messages into the topic + run("KAFKA_PUT(topic, [ message1, message2, message3 ])"); + { + // get the 3rd message from the topic + Object actual = run("KAFKA_SEEK(topic, 0, 2)"); + assertEquals(message3, actual); + } + { + // get the 2nd message from the topic + Object actual = run("KAFKA_SEEK(topic, 0, 1)"); + assertEquals(message2, actual); + } + { + // get the 1st message from the topic + Object actual = run("KAFKA_SEEK(topic, 0, 0)"); + assertEquals(message1, actual); + } + } + + /** + * KAFKA_SEEK should return null if the offset does not exist + */ + @Test + public void testKafkaSeekToMissingOffset() throws Exception { + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // put 3 messages into the topic + run("KAFKA_PUT(topic, [ message1, message2, message3 ])"); + + // get the 3rd message from the topic + Object actual = run("KAFKA_SEEK(topic, 0, 9999)"); + assertNull(actual); + } + + /** + * KAFKA_SEEK should return null if the partition does not exist + */ + @Test + public void testKafkaSeekToMissingPartition() throws Exception { + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // put 3 messages into the topic + run("KAFKA_PUT(topic, [ message1, message2, message3 ])"); + + // get the 3rd message from the topic + Object actual = run("KAFKA_SEEK(topic, 99999, 0)"); + assertNull(actual); + } + + /** + * KAFKA_SEEK should allow a user to see a detailed view of each Kafka record. + */ + @Test + public void testKafkaSeekWithRichView() throws Exception { + + // configure a detailed view of each message + global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH); + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + run("KAFKA_PUT(topic, [ message1, message2, message3 ])"); + Object actual = run("KAFKA_SEEK(topic, 0, 0)"); + + // expect a 'rich' view of the record + assertTrue(actual instanceof Map); + Map<String, Object> view = (Map) actual; + assertNull(view.get("key")); + assertNotNull(view.get("offset")); + assertEquals(0, view.get("partition")); + assertEquals(topicName, view.get("topic")); + assertEquals(message1, view.get("value")); + } + + /** * Runs a Stellar expression. * @param expression The expression to run. */