METRON-1572 Enhance KAFKA_PUT function (nickwallen) closes apache/metron#1024
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/40796c06 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/40796c06 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/40796c06 Branch: refs/heads/feature/METRON-1554-pcap-query-panel Commit: 40796c06ad96ae45dd853925fbae8c26509f6c2f Parents: ae1d3eb Author: nickwallen <[email protected]> Authored: Fri Jun 8 08:39:20 2018 -0400 Committer: nickallen <[email protected]> Committed: Fri Jun 8 08:39:20 2018 -0400 ---------------------------------------------------------------------- .../metron/management/KafkaFunctions.java | 87 +++++++++++++++----- .../KafkaFunctionsIntegrationTest.java | 21 +++++ 2 files changed, 89 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/40796c06/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java index 316e19d..a0c92eb 100644 --- a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java +++ b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; @@ -37,7 +38,7 @@ import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -46,6 +47,9 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static java.lang.String.format; import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG; @@ -332,18 +336,26 @@ public class KafkaFunctions { /** * KAFKA_PUT * - * Sends messages to a Kafka topic. + * <p>Sends messages to a Kafka topic. * - * Example: Put two messages on the topic 'topic'. + * <p>Example: Put two messages on the topic 'topic'. + * <pre> + * {@code * KAFKA_PUT('topic', ["message1", "message2"]) + * } + * </pre> * - * Example: Put a message on a topic and also define an alternative Kafka broker. + * <p>Example: Put a message on a topic and also define an alternative Kafka broker. + * <pre> + * {@code * KAFKA_PUT('topic', ["message1"], { "bootstrap.servers": "kafka-broker-1:6667" }) + * } + * </pre> */ @Stellar( namespace = "KAFKA", name = "PUT", - description = "Sends messages to a Kafka topic.", + description = "Sends messages to a Kafka topic. ", params = { "topic - The name of the Kafka topic.", "messages - A list of messages to write.", @@ -355,45 +367,82 @@ public class KafkaFunctions { @Override public Object apply(List<Object> args, Context context) throws ParseException { - String topic = ConversionUtils.convert(args.get(0), String.class); - List<String> messages = ConversionUtils.convert(args.get(1), List.class); - // build the properties for kafka + List<String> messages; + if(args.get(1) instanceof String) { + // a single message needs sent + String msg = ConversionUtils.convert(args.get(1), String.class); + messages = Collections.singletonList(msg); + + } else { + // a list of messages; all need sent + messages = ConversionUtils.convert(args.get(1), List.class); + } + + // are there any overrides? Map<String, String> overrides = new HashMap<>(); if(args.size() > 2) { overrides = ConversionUtils.convert(args.get(2), Map.class); } - Properties properties = buildKafkaProperties(overrides, context); // send the messages - try { - send(topic, messages, properties); - - } catch(InterruptedException | ExecutionException e) { - throw new ParseException(e.getMessage(), e); - } + Properties properties = buildKafkaProperties(overrides, context); + putMessages(topic, messages, properties); return null; } /** - * Send each message synchronously. + * Put messages to a Kafka topic. + * + * <p>Sends each message synchronously. + * * @param topic The topic to send messages to. * @param messages The messages to send. * @param properties The properties to use with Kafka. */ - private void send(String topic, List<String> messages, Properties properties) throws InterruptedException, ExecutionException { + private void putMessages(String topic, List<String> messages, Properties properties) { + LOG.debug("KAFKA_PUT sending messages; topic={}, count={}", topic, messages.size()); try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) { - // send each message synchronously, hence the get() + List<Future<RecordMetadata>> futures = new ArrayList<>(); + + // send each message for(String msg : messages) { - producer.send(new ProducerRecord<>(topic, msg)).get(); + Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, msg)); + futures.add(future); + } + + // wait for the sends to complete + for(Future<RecordMetadata> future : futures) { + waitForResponse(future, properties); } + producer.flush(); } } + /** + * Wait for response to the message being sent. + * + * @param future The future for the message being sent. + * @param properties The configuration properties. + * @return + */ + private void waitForResponse(Future<RecordMetadata> future, Properties properties) { + int maxWait = getMaxWait(properties); + try { + // wait for the record and then render it for the user + RecordMetadata record = future.get(maxWait, TimeUnit.MILLISECONDS); + LOG.debug("KAFKA_PUT message sent; topic={}, partition={}, offset={}", + record.topic(), record.partition(), record.offset()); + + } catch(TimeoutException | InterruptedException | ExecutionException e) { + LOG.error("KAFKA_PUT message send failure", e); + } + } + @Override public void initialize(Context context) { // no initialization required http://git-wip-us.apache.org/repos/asf/metron/blob/40796c06/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java index 74c6705..ad45b52 100644 --- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java +++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java @@ -161,6 +161,26 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { } /** + * KAFKA_PUT should be able to write a message passed as a String, rather than a List. + */ + @Test + public void testKafkaPutOneMessagePassedAsString() { + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // put a message onto the topic - the message is just a string, not a list + run("KAFKA_PUT(topic, message1)"); + + // get a message from the topic + Object actual = run("KAFKA_GET(topic)"); + + // validate + assertEquals(Collections.singletonList(message1), actual); + } + + /** * KAFKA_PUT should be able to write multiple messages passed as a List. * KAFKA_GET should be able to read multiple messages at once. */ @@ -373,3 +393,4 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { } } } +
