Repository: metron Updated Branches: refs/heads/feature/METRON-1416-upgrade-solr 975923e8d -> 1767727a7
METRON-1573 Enhance KAFKA_* functions to return partition and offset details (nickwallen) closes apache/metron#1030 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/0bb35800 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/0bb35800 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/0bb35800 Branch: refs/heads/feature/METRON-1416-upgrade-solr Commit: 0bb358009f1823fd19682ddb18d00aefa1441bf6 Parents: b081e80 Author: nickwallen <[email protected]> Authored: Mon Jun 18 11:29:53 2018 -0400 Committer: nickallen <[email protected]> Committed: Mon Jun 18 11:29:53 2018 -0400 ---------------------------------------------------------------------- .../metron/management/KafkaFunctions.java | 173 +++++++++++++++++-- .../KafkaFunctionsIntegrationTest.java | 165 +++++++++++++++++- 2 files changed, 322 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/0bb35800/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java index f256672..7c9c23f 100644 --- a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java +++ b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java @@ -18,6 +18,7 @@ package org.apache.metron.management; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.ClassUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -30,7 +31,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.metron.common.system.Clock; -import org.apache.metron.profiler.client.stellar.Util; import org.apache.metron.stellar.common.LambdaExpression; import org.apache.metron.stellar.common.utils.ConversionUtils; import org.apache.metron.stellar.common.utils.JSONUtils; @@ -66,6 +66,7 @@ import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG; * KAFKA_GET * KAFKA_PUT * KAFKA_TAIL + * KAFKA_FIND * KAFKA_PROPS */ public class KafkaFunctions { @@ -98,6 +99,30 @@ public class KafkaFunctions { public static final int DEFAULT_MAX_WAIT = 5000; /** + * The key for the global property that defines how a message is returned + * from the set of KAFKA functions. + * + * <p>simple - The result contains only the message value as a string. + * <p>rich - The result contains the message value, topic, partition, and offset. + */ + public static final String MESSAGE_VIEW_PROPERTY = "stellar.kafka.message.view"; + + /** + * An acceptable value for the 'stellar.kafka.message.view' property. The result + * provided will contain only the message value as a string. + */ + public static final String MESSAGE_VIEW_SIMPLE = "simple"; + + /** + * An acceptable value for the 'stellar.kafka.message.view' property. + * + * <p>Provides a view of each message with more detailed metadata beyond just the + * message value. The result provided will contain the message value, topic, partition, + * and offset. + */ + public static final String MESSAGE_VIEW_RICH = "rich"; + + /** * The default set of Kafka properties. */ private static Properties defaultProperties = defaultKafkaProperties(); @@ -137,6 +162,12 @@ public class KafkaFunctions { * KAFKA_GET('topic', 1, { "auto.offset.reset": "earliest" }) * } * </pre> + * + * <p>By default, only the message value is returned. By setting the global property + * 'stellar.kafka.message.view' = 'rich' the function will return additional Kafka metadata + * including the topic, partition, offset, key, and timestamp contained in a map. Setting + * this property value to 'simple' or simply not setting the property value, will result + * in the default view behavior. */ @Stellar( namespace = "KAFKA", @@ -202,7 +233,8 @@ public class KafkaFunctions { while(messages.size() < count && wait < maxWait) { for(ConsumerRecord<String, String> record: consumer.poll(pollTimeout)) { - messages.add(record.value()); + Object viewOfMessage = render(record, properties); + messages.add(viewOfMessage); } // how long have we waited? @@ -247,6 +279,12 @@ public class KafkaFunctions { * KAFKA_TAIL('topic', 10) * } * </pre> + * + * <p>By default, only the message value is returned. By setting the global property + * 'stellar.kafka.message.view' = 'rich' the function will return additional Kafka metadata + * including the topic, partition, offset, key, and timestamp contained in a map. Setting + * this property value to 'simple' or simply not setting the property value, will result + * in the default view behavior. */ @Stellar( namespace = "KAFKA", @@ -312,7 +350,8 @@ public class KafkaFunctions { while(messages.size() < count && wait < maxWait) { for(ConsumerRecord<String, String> record: consumer.poll(pollTimeout)) { - messages.add(record.value()); + Object viewOfMessage = render(record, properties); + messages.add(viewOfMessage); } // how long have we waited? @@ -357,6 +396,7 @@ public class KafkaFunctions { * KAFKA_PUT('topic', ["message1"], { "bootstrap.servers": "kafka-broker-1:6667" }) * } * </pre> + * */ @Stellar( namespace = "KAFKA", @@ -394,9 +434,49 @@ public class KafkaFunctions { // send the messages Properties properties = buildKafkaProperties(overrides, context); - putMessages(topic, messages, properties); + List<RecordMetadata> records = putMessages(topic, messages, properties); - return null; + // render a view of the messages that were written for the user + Object view = render(records, properties); + return view; + } + + /** + * Render a view of the {@link RecordMetadata} that resulted from writing + * messages to Kafka. + * + * @param records The record metadata. + * @param properties The properties. + * @return + */ + private Object render(List<RecordMetadata> records, Properties properties) { + + Object view; + if(MESSAGE_VIEW_RICH.equals(getMessageView(properties))) { + + // build a 'rich' view of the messages that were written + List<Object> responses = new ArrayList<>(); + for(RecordMetadata record: records) { + + // render the 'rich' view of the record + Map<String, Object> richView = new HashMap<>(); + richView.put("topic", record.topic()); + richView.put("partition", record.partition()); + richView.put("offset", record.offset()); + richView.put("timestamp", record.timestamp()); + + responses.add(richView); + } + + // the rich view is a list of maps containing metadata about how each message was written + view = responses; + + } else { + + // otherwise, the view is simply a count of the number of messages written + view = CollectionUtils.size(records); + } + return view; } /** @@ -407,9 +487,11 @@ public class KafkaFunctions { * @param topic The topic to send messages to. * @param messages The messages to send. * @param properties The properties to use with Kafka. + * @return Metadata about all the records written to Kafka. */ - private void putMessages(String topic, List<String> messages, Properties properties) { + private List<RecordMetadata> putMessages(String topic, List<String> messages, Properties properties) { LOG.debug("KAFKA_PUT sending messages; topic={}, count={}", topic, messages.size()); + List<RecordMetadata> records = new ArrayList<>(); try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) { List<Future<RecordMetadata>> futures = new ArrayList<>(); @@ -422,11 +504,14 @@ public class KafkaFunctions { // wait for the sends to complete for(Future<RecordMetadata> future : futures) { - waitForResponse(future, properties); + RecordMetadata record = waitForResponse(future, properties); + records.add(record); } producer.flush(); } + + return records; } /** @@ -434,19 +519,23 @@ public class KafkaFunctions { * * @param future The future for the message being sent. * @param properties The configuration properties. - * @return + * @return Metadata about the record that was written to Kafka. */ - private void waitForResponse(Future<RecordMetadata> future, Properties properties) { + private RecordMetadata waitForResponse(Future<RecordMetadata> future, Properties properties) { + RecordMetadata record = null; int maxWait = getMaxWait(properties); + try { // wait for the record and then render it for the user - RecordMetadata record = future.get(maxWait, TimeUnit.MILLISECONDS); + record = future.get(maxWait, TimeUnit.MILLISECONDS); LOG.debug("KAFKA_PUT message sent; topic={}, partition={}, offset={}", record.topic(), record.partition(), record.offset()); } catch(TimeoutException | InterruptedException | ExecutionException e) { LOG.error("KAFKA_PUT message send failure", e); } + + return record; } @Override @@ -528,6 +617,12 @@ public class KafkaFunctions { * KAFKA_FIND('topic', m -> MAP_EXISTS('geo', m), 10) * } * </pre> + * + * <p>By default, only the message value is returned. By setting the global property + * 'stellar.kafka.message.view' = 'rich' the function will return additional Kafka metadata + * including the topic, partition, offset, key, and timestamp contained in a map. Setting + * this property value to 'simple' or simply not setting the property value, will result + * in the default view behavior. */ @Stellar( namespace = "KAFKA", @@ -601,7 +696,8 @@ public class KafkaFunctions { // only keep the message if the filter expression is satisfied if(isSatisfied(filter, record.value())) { - messages.add(record.value()); + Object view = render(record, properties); + messages.add(view); // do we have enough messages already? if(messages.size() >= count) { @@ -667,6 +763,41 @@ public class KafkaFunctions { } /** + * Renders the Kafka record into a view. + * + * <p>A user can customize the way in which a Kafka record is rendered by altering + * the "stellar.kafka.message.view" property. + * + * @param record The Kafka record to render. + * @param properties The properties which allows a user to customize the rendered view of a record. + * @return + */ + private static Object render(ConsumerRecord<String, String> record, Properties properties) { + LOG.debug("Render message; topic={}, partition={}, offset={}", + record.topic(), record.partition(), record.offset()); + + Object result; + if(MESSAGE_VIEW_RICH.equals(getMessageView(properties))) { + // build the detailed view of the record + Map<String, Object> view = new HashMap<>(); + view.put("value", record.value()); + view.put("topic", record.topic()); + view.put("partition", record.partition()); + view.put("offset", record.offset()); + view.put("timestamp", record.timestamp()); + view.put("key", record.key()); + + result = view; + + } else { + // default to the simple view + result = record.value(); + } + + return result; + } + + /** * Manually assigns all partitions in a topic to a consumer * * @param topic The topic whose partitions will be assigned. @@ -756,6 +887,23 @@ public class KafkaFunctions { } /** + * Determines how Kafka messages should be rendered for the user. + * + * @param properties The properties. + * @return How the Kafka messages should be rendered. + */ + private static String getMessageView(Properties properties) { + // defaults to the simple view + String messageView = MESSAGE_VIEW_SIMPLE; + + if(properties.containsKey(MESSAGE_VIEW_PROPERTY)) { + messageView = ConversionUtils.convert(properties.get(MESSAGE_VIEW_PROPERTY), String.class); + } + + return messageView; + } + + /** * Defines a minimal set of default parameters that can be overridden * via the global properties. */ @@ -792,6 +940,9 @@ public class KafkaFunctions { // set the default poll timeout properties.put(POLL_TIMEOUT_PROPERTY, DEFAULT_POLL_TIMEOUT); + // set the default message view + properties.put(MESSAGE_VIEW_PROPERTY, MESSAGE_VIEW_SIMPLE); + return properties; } http://git-wip-us.apache.org/repos/asf/metron/blob/0bb35800/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java index d82bb37..5e045ad 100644 --- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java +++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java @@ -48,6 +48,8 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; /** @@ -153,7 +155,45 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { variables.put("topic", topicName); // put a message onto the topic - run("KAFKA_PUT(topic, [message1])"); + assertEquals(1, run("KAFKA_PUT(topic, [message1])")); + + // validate the message in the topic + assertEquals(Collections.singletonList(message1), run("KAFKA_GET(topic)")); + } + + /** + * KAFKA_PUT should be able to write multiple message to a topic. + */ + @Test + public void testKafkaPutMultipleMessages() { + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // put a message onto the topic + assertEquals(2, run("KAFKA_PUT(topic, [message1, message2])")); + + // validate the message in the topic + List<String> expected = new ArrayList<String>() {{ + add(message1); + add(message2); + }}; + assertEquals(expected, run("KAFKA_GET(topic, 2)")); + } + + /** + * KAFKA_PUT should be able to write a message passed as a String, rather than a List. + */ + @Test + public void testKafkaPutOneMessagePassedAsString() { + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // put a message onto the topic - the message is just a string, not a list + run("KAFKA_PUT(topic, message1)"); // get a message from the topic Object actual = run("KAFKA_GET(topic)"); @@ -166,7 +206,40 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { * KAFKA_PUT should be able to write a message passed as a String, rather than a List. */ @Test - public void testKafkaPutOneMessagePassedAsString() { + public void testKafkaPutWithRichView() { + + // configure a detailed view of each message + global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH); + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // put a message onto the topic - the message is just a string, not a list + Object actual = run("KAFKA_PUT(topic, message1)"); + + // validate + assertTrue(actual instanceof List); + List<Object> results = (List) actual; + assertEquals(1, results.size()); + + // expect a 'rich' view of the record + Map<String, Object> view = (Map) results.get(0); + assertEquals(topicName, view.get("topic")); + assertEquals(0, view.get("partition")); + assertEquals(0L, view.get("offset")); + assertNotNull(view.get("timestamp")); + + } + + /** + * KAFKA_GET should allow a user to see a detailed view of each Kafka record. + */ + @Test + public void testKafkaGetWithRichView() { + + // configure a detailed view of each message + global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH); // use a unique topic name for this test final String topicName = testName.getMethodName(); @@ -179,7 +252,17 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { Object actual = run("KAFKA_GET(topic)"); // validate - assertEquals(Collections.singletonList(message1), actual); + assertTrue(actual instanceof List); + List<Object> results = (List) actual; + assertEquals(1, results.size()); + + // expect a 'rich' view of the record + Map<String, Object> view = (Map) results.get(0); + assertNull(view.get("key")); + assertEquals(0L, view.get("offset")); + assertEquals(0, view.get("partition")); + assertEquals(topicName, view.get("topic")); + assertEquals(message1, view.get("value")); } /** @@ -300,6 +383,45 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { } /** + * KAFKA_TAIL should allow a user to see a rich view of each Kafka record. + */ + @Test + public void testKafkaTailWithRichView() throws Exception { + + // configure a detailed view of each message + global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH); + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // put multiple messages onto the topic; KAFKA tail should NOT retrieve these + run("KAFKA_PUT(topic, [message2, message2, message2])"); + + // get a message from the topic; will block until messages arrive + Future<Object> tailFuture = runAsync("KAFKA_TAIL(topic, 1)"); + + // put 10 messages onto the topic for KAFKA_TAIL to grab + runAsyncAndWait(Collections.nCopies(10, "KAFKA_PUT(topic, [message1])")); + + // wait for KAFKA_TAIL to complete + Object actual = tailFuture.get(10, TimeUnit.SECONDS); + + // validate + assertTrue(actual instanceof List); + List<Object> results = (List) actual; + assertEquals(1, results.size()); + + // expect a 'rich' view of the record + Map<String, Object> view = (Map) results.get(0); + assertNull(view.get("key")); + assertEquals(0, view.get("partition")); + assertEquals(topicName, view.get("topic")); + assertEquals(message1, view.get("value")); + assertNotNull(view.get("offset")); + } + + /** * KAFKA_PROPS should return the set of properties used to configure the Kafka consumer * * The properties used for the KAFKA_* functions are calculated by compiling the default, global and user @@ -339,7 +461,7 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { Map<String, String> properties = (Map<String, String>) run(expression); assertEquals(expected, properties.get(overriddenKey)); } - + /** * KAFKA_FIND should only return messages that satisfy a filter expression. */ @@ -385,6 +507,40 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { } /** + * KAFKA_FIND should allow a user to see a detailed view of each Kafka record. + */ + @Test + public void testKafkaFindWithRichView() throws Exception { + + // configure a detailed view of each message + global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH); + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // find all messages satisfying the filter expression + Future<Object> future = runAsync("KAFKA_FIND(topic, m -> MAP_GET('value', m) == 23)"); + + // put 10 messages onto the topic for KAFKA_TAIL to grab + runAsyncAndWait(Collections.nCopies(10, "KAFKA_PUT(topic, [message2])")); + + // validate + Object actual = future.get(10, TimeUnit.SECONDS); + assertTrue(actual instanceof List); + List<Object> results = (List) actual; + assertEquals(1, results.size()); + + // expect a 'rich' view of the record + Map<String, Object> view = (Map) results.get(0); + assertNull(view.get("key")); + assertNotNull(view.get("offset")); + assertEquals(0, view.get("partition")); + assertEquals(topicName, view.get("topic")); + assertEquals(message2, view.get("value")); + } + + /** * KAFKA_FIND should return no more messages than its limit. */ @Test @@ -491,4 +647,3 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { } } } -
