Repository: metron
Updated Branches:
  refs/heads/master cbdaee174 -> 7af11b626


METRON-1656 Create KAKFA_SEEK function (nickwallen) closes apache/metron#1097


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/7af11b62
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/7af11b62
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/7af11b62

Branch: refs/heads/master
Commit: 7af11b626e2d30aaaf7c01c364295b0b407fae49
Parents: cbdaee1
Author: nickwallen <n...@nickallen.org>
Authored: Wed Jul 11 11:10:06 2018 -0400
Committer: nickallen <nickal...@apache.org>
Committed: Wed Jul 11 11:10:06 2018 -0400

----------------------------------------------------------------------
 .../metron/management/KafkaFunctions.java       | 109 +++++++++++++++++++
 .../KafkaFunctionsIntegrationTest.java          |  92 ++++++++++++++++
 2 files changed, 201 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/7af11b62/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
 
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
index 7c9c23f..76418b6 100644
--- 
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
+++ 
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
@@ -47,6 +47,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -763,6 +764,114 @@ public class KafkaFunctions {
   }
 
   /**
+   * KAFKA_SEEK
+   *
+   * <p>Seeks to a specific offset and returns the message.
+   *
+   * <p>Example: Find the message in 'topic', partition 2, offset 1001.
+   * <pre>
+   * {@code
+   * KAFKA_SEEK('topic', 1, 1001)
+   * }
+   * </pre>
+   *
+   * <p>By default, only the message value is returned. By setting the global 
property
+   * 'stellar.kafka.message.view' = 'rich' the function will return additional 
Kafka metadata
+   * including the topic, partition, offset, key, and timestamp contained in a 
map. Setting
+   * this property value to 'simple' or simply not setting the property value, 
will result
+   * in the default view behavior.
+   */
+  @Stellar(
+          namespace = "KAFKA",
+          name = "SEEK",
+          description = "Seeks to an offset within a topic and returns the 
message.",
+          params = {
+                  "topic - The name of the Kafka topic",
+                  "partition - The partition identifier; starts at 0.",
+                  "offset - The offset within the partition; starts at 0.",
+                  "config - Optional map of key/values that override any 
global properties."
+          },
+          returns = "The message at the given offset, if the offset exists. 
Otherwise, returns null."
+  )
+  public static class KafkaSeek implements StellarFunction {
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws 
ParseException {
+      // required - the topic, partition, and offset are all required
+      String topic = getArg("topic", 0, String.class, args);
+      int partition = getArg("partition", 1, Integer.class, args);
+      int offset = getArg("offset", 2, Integer.class, args);
+
+      // optional - property overrides provided by the user
+      Map<String, String> overrides = new HashMap<>();
+      if(args.size() > 3) {
+        overrides = getArg("overrides", 3, Map.class, args);
+      }
+
+      Properties properties = buildKafkaProperties(overrides, context);
+      return seek(topic, partition, offset, properties);
+    }
+
+    /**
+     * Find messages in Kafka that satisfy a filter expression.
+     *
+     * @param topic The kafka topic.
+     * @param partition The partition identifier.
+     * @param offset The offset within the given partition.
+     * @param properties Function configuration values.
+     * @return A list of messages that satisfy the filter expression.
+     */
+    private Object seek(String topic, int partition, int offset, Properties 
properties) {
+      final int pollTimeout = getPollTimeout(properties);
+      final int maxWait = getMaxWait(properties);
+      Object message = null;
+      try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(properties)) {
+
+        // continue until we have the message or exceeded the max wait time
+        long wait = 0L;
+        final long start = clock.currentTimeMillis();
+        while(message == null && wait < maxWait) {
+
+          // seek to the offset
+          TopicPartition topar = new TopicPartition(topic, partition);
+          consumer.assign(Collections.singletonList(topar));
+          consumer.seek(topar, offset);
+
+          // poll kafka for messages
+          for(ConsumerRecord<String, String> record : 
consumer.poll(pollTimeout)) {
+
+            // kafka will attempt to be helpful and return a message, even if 
the actual offset does not exist
+            if(record.offset() == offset && record.partition() == partition) {
+              LOG.debug("KAFKA_SEEK found message; topic={}, partition={}, 
offset={}", topic, partition, offset);
+              message = render(record, properties);
+            }
+          }
+
+          // how long have we waited?
+          wait = clock.currentTimeMillis() - start;
+          if(LOG.isDebugEnabled() && message == null) {
+            LOG.debug("KAFKA_SEEK no message yet; topic={}, partition={}, 
offset={}, waitTime={} ms",
+                    topic, partition, offset, wait);
+          }
+        }
+      }
+
+      return message;
+    }
+
+    @Override
+    public void initialize(Context context) {
+      // no initialization required
+    }
+
+    @Override
+    public boolean isInitialized() {
+      // no initialization required
+      return true;
+    }
+  }
+
+  /**
    * Renders the Kafka record into a view.
    *
    * <p>A user can customize the way in which a Kafka record is rendered by 
altering

http://git-wip-us.apache.org/repos/asf/metron/blob/7af11b62/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
 
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
index 5e045ad..09bce17 100644
--- 
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
+++ 
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
@@ -113,6 +113,7 @@ public class KafkaFunctionsIntegrationTest extends 
BaseIntegrationTest {
             .withClass(KafkaFunctions.KafkaProps.class)
             .withClass(KafkaFunctions.KafkaTail.class)
             .withClass(KafkaFunctions.KafkaFind.class)
+            .withClass(KafkaFunctions.KafkaSeek.class)
             .withClass(MapFunctions.MapGet.class);
   }
 
@@ -593,6 +594,97 @@ public class KafkaFunctionsIntegrationTest extends 
BaseIntegrationTest {
   }
 
   /**
+   * KAFKA_SEEK should return the message at a given offset.
+   */
+  @Test
+  public void testKafkaSeek() throws Exception {
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // put 3 messages into the topic
+    run("KAFKA_PUT(topic, [ message1, message2, message3 ])");
+    {
+      // get the 3rd message from the topic
+      Object actual = run("KAFKA_SEEK(topic, 0, 2)");
+      assertEquals(message3, actual);
+    }
+    {
+      // get the 2nd message from the topic
+      Object actual = run("KAFKA_SEEK(topic, 0, 1)");
+      assertEquals(message2, actual);
+    }
+    {
+      // get the 1st message from the topic
+      Object actual = run("KAFKA_SEEK(topic, 0, 0)");
+      assertEquals(message1, actual);
+    }
+  }
+
+  /**
+   * KAFKA_SEEK should return null if the offset does not exist
+   */
+  @Test
+  public void testKafkaSeekToMissingOffset() throws Exception {
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // put 3 messages into the topic
+    run("KAFKA_PUT(topic, [ message1, message2, message3 ])");
+
+    // get the 3rd message from the topic
+    Object actual = run("KAFKA_SEEK(topic, 0, 9999)");
+    assertNull(actual);
+  }
+
+  /**
+   * KAFKA_SEEK should return null if the partition does not exist
+   */
+  @Test
+  public void testKafkaSeekToMissingPartition() throws Exception {
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // put 3 messages into the topic
+    run("KAFKA_PUT(topic, [ message1, message2, message3 ])");
+
+    // get the 3rd message from the topic
+    Object actual = run("KAFKA_SEEK(topic, 99999, 0)");
+    assertNull(actual);
+  }
+
+  /**
+   * KAFKA_SEEK should allow a user to see a detailed view of each Kafka 
record.
+   */
+  @Test
+  public void testKafkaSeekWithRichView() throws Exception {
+
+    // configure a detailed view of each message
+    global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, 
KafkaFunctions.MESSAGE_VIEW_RICH);
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    run("KAFKA_PUT(topic, [ message1, message2, message3 ])");
+    Object actual = run("KAFKA_SEEK(topic, 0, 0)");
+
+    // expect a 'rich' view of the record
+    assertTrue(actual instanceof Map);
+    Map<String, Object> view = (Map) actual;
+    assertNull(view.get("key"));
+    assertNotNull(view.get("offset"));
+    assertEquals(0, view.get("partition"));
+    assertEquals(topicName, view.get("topic"));
+    assertEquals(message1, view.get("value"));
+  }
+
+  /**
    * Runs a Stellar expression.
    * @param expression The expression to run.
    */

Reply via email to