METRON-1642: KafkaWriter should be able choose the topic from a field in 
addition to topology construction time closes apache/incubator-metron#1082


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/097ce950
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/097ce950
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/097ce950

Branch: refs/heads/feature/METRON-1554-pcap-query-panel
Commit: 097ce95030e31bf9bd2af74aa56dc03348f7fef7
Parents: fc9ff85
Author: cstella <[email protected]>
Authored: Tue Jul 3 09:29:47 2018 -0400
Committer: cstella <[email protected]>
Committed: Tue Jul 3 09:29:47 2018 -0400

----------------------------------------------------------------------
 metron-platform/metron-parsers/README.md        |  1 +
 .../integration/WriterBoltIntegrationTest.java  | 75 +++++++++++++++++++-
 metron-platform/metron-writer/README.md         | 24 +++++++
 .../apache/metron/writer/kafka/KafkaWriter.java | 43 +++++++++--
 .../metron/writer/kafka/KafkaWriterTest.java    | 63 ++++++++++++++++
 5 files changed, 199 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/097ce950/metron-platform/metron-parsers/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/README.md 
b/metron-platform/metron-parsers/README.md
index d79b9ce..7ddfdea 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -166,6 +166,7 @@ then it is assumed to be a regex and will match any topic 
matching the pattern (
   * `batchTimeout` : The timeout after which a batch will be flushed even if 
batchSize has not been met.  Optional.
     If unspecified, or set to `0`, it defaults to a system-determined duration 
which is a fraction of the Storm
     parameter `topology.message.timeout.secs`.  Ignored if batchSize is `1`, 
since this disables batching.
+  * The kafka writer can be configured within the parser config as well.  
(This is all configured a priori, but this is convenient for overriding the 
settings).  See [here](../metron-writer/README.md#kafka-writer)
 * `fieldTransformations` : An array of complex objects representing the 
transformations to be done on the message generated from the parser before 
writing out to the kafka topic.
 * `spoutParallelism` : The kafka spout parallelism (default to `1`).  This can 
be overridden on the command line.
 * `spoutNumTasks` : The number of tasks for the spout (default to `1`). This 
can be overridden on the command line.

http://git-wip-us.apache.org/repos/asf/metron/blob/097ce950/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
index dfadfdc..99506de 100644
--- 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertThat;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.io.Serializable;
@@ -32,6 +33,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.function.Predicate;
 import javax.annotation.Nullable;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -66,7 +68,7 @@ public class WriterBoltIntegrationTest extends 
BaseIntegrationTest {
 
     @Override
     public boolean isValid(Map<String, Object> input, Map<String, Object> 
validationConfig, Map<String, Object> globalConfig, Context context) {
-      if (input.get("action").equals("invalid")) {
+      if (input.get("action") != null && 
input.get("action").equals("invalid")) {
         return false;
       }
       return true;
@@ -105,6 +107,69 @@ public class WriterBoltIntegrationTest extends 
BaseIntegrationTest {
   @Multiline
   public static String parserConfigJSON;
 
+  /**
+   * {
+   *    "parserClassName" : "org.apache.metron.parsers.csv.CSVParser",
+   *    "sensorTopic": "dummy",
+   *    "outputTopic": "output",
+   *    "errorTopic": "parser_error",
+   *    "parserConfig": {
+   *        "batchSize" : 1,
+   *        "columns" : {
+   *            "name" : 0,
+   *            "dummy" : 1
+   *        },
+   *      "kafka.topicField" : "route_field"
+   *    }
+   *    ,"fieldTransformations" : [
+   *    {
+   *      "transformation" : "STELLAR"
+   *     ,"input" :  ["name"]
+   *     ,"output" :  ["route_field"]
+   *     ,"config" : {
+   *        "route_field" : "match{ name == 'metron' => 'output', default => 
NULL}"
+   *      }
+   *    }
+   *    ]
+   * }
+   */
+  @Multiline
+  public static String parserConfigJSONKafkaRedirection;
+
+  @Test
+  public void test_topic_redirection() throws Exception {
+    final String sensorType = "dummy";
+    SensorParserConfig parserConfig = 
JSONUtils.INSTANCE.load(parserConfigJSONKafkaRedirection, 
SensorParserConfig.class);
+    final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
+      add(Bytes.toBytes("metron,foo"));
+      add(Bytes.toBytes("notmetron,foo"));
+      add(Bytes.toBytes("metron,bar"));
+      add(Bytes.toBytes("metron,baz"));
+    }};
+
+    final Properties topologyProperties = new Properties();
+    ComponentRunner runner = setupTopologyComponents(topologyProperties, 
sensorType, parserConfig, globalConfigWithValidation);
+    try {
+      runner.start();
+      kafkaComponent.writeMessages(sensorType, inputMessages);
+      KafkaProcessor<Map<String, List<JSONObject>>> kafkaProcessor = 
getKafkaProcessor(
+          parserConfig.getOutputTopic(), parserConfig.getErrorTopic(), 
kafkaMessageSet -> kafkaMessageSet.getMessages().size() == 3 && 
kafkaMessageSet.getErrors().isEmpty());
+      ProcessorResult<Map<String, List<JSONObject>>> result = 
runner.process(kafkaProcessor);
+
+      // validate the output messages
+      Map<String,List<JSONObject>> outputMessages = result.getResult();
+      for(JSONObject j : outputMessages.get(Constants.ENRICHMENT_TOPIC)) {
+        Assert.assertEquals("metron", j.get("name"));
+        Assert.assertEquals("output", j.get("route_field"));
+        Assert.assertTrue(ImmutableSet.of("foo", "bar", 
"baz").contains(j.get("dummy")));
+      }
+    } finally {
+      if(runner != null) {
+        runner.stop();
+      }
+    }
+  }
+
   @Test
   public void 
parser_with_global_validations_writes_bad_records_to_error_topic() throws 
Exception {
     final String sensorType = "dummy";
@@ -192,9 +257,13 @@ public class WriterBoltIntegrationTest extends 
BaseIntegrationTest {
         .build();
   }
 
-  @SuppressWarnings("unchecked")
   private KafkaProcessor<Map<String, List<JSONObject>>> 
getKafkaProcessor(String outputTopic,
       String errorTopic) {
+    return getKafkaProcessor(outputTopic, errorTopic, messageSet -> 
(messageSet.getMessages().size() == 1) && (messageSet.getErrors().size() == 2));
+  }
+  @SuppressWarnings("unchecked")
+  private KafkaProcessor<Map<String, List<JSONObject>>> 
getKafkaProcessor(String outputTopic,
+      String errorTopic, Predicate<KafkaMessageSet> predicate) {
 
     return new KafkaProcessor<>()
         .withKafkaComponentName("kafka")
@@ -204,7 +273,7 @@ public class WriterBoltIntegrationTest extends 
BaseIntegrationTest {
           @Nullable
           @Override
           public Boolean apply(@Nullable KafkaMessageSet messageSet) {
-            return (messageSet.getMessages().size() == 1) && 
(messageSet.getErrors().size() == 2);
+            return predicate.test(messageSet);
           }
         })
         .withProvideResult(new Function<KafkaMessageSet, Map<String, 
List<JSONObject>>>() {

http://git-wip-us.apache.org/repos/asf/metron/blob/097ce950/metron-platform/metron-writer/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/README.md 
b/metron-platform/metron-writer/README.md
index 16c6686..21cdbca 100644
--- a/metron-platform/metron-writer/README.md
+++ b/metron-platform/metron-writer/README.md
@@ -20,6 +20,30 @@ limitations under the License.
 ## Introduction
 The writer module provides some utilties for writing to outside components 
from within Storm.  This includes managing bulk writing.  An implemention is 
included for writing to HDFS in this module. Other writers can be found in 
their own modules.
 
+## Kafka Writer
+We have an implementation of a writer which will write batches of
+messages to Kafka.  An interesting aspect of this writer is that it can
+be configured to allow users to specify a message field which contains
+the topic for the message.
+
+The configuration for this writer is held in the individual Sensor
+Configurations:
+* [Enrichment](../metron-enrichment/README.md#sensor-enrichment-configuration) 
under the `config` element
+* [Parsers](../metron-parsers/README.md#parser-configuration) in the 
`parserConfig` element
+* Profiler - Unsupported currently
+
+In each of these, the kafka writer can be configured via a map which has
+the following elements:
+* `kafka.brokerUrl` : The broker URL
+* `kafka.keySerializer` : The key serializer (defaults to `StringSerializer`)
+* `kafka.valueSerializer` : The key serializer (defaults to `StringSerializer`)
+* `kafka.zkQuorum` : The zookeeper quorum
+* `kafka.requiredAcks` : Whether to require acks.
+* `kafka.topic` : The topic to write to
+* `kafka.topicField` : The field to pull the topic from.  If this is 
specified, then the producer will use this.  If it is unspecified, then it will 
default to the `kafka.topic` property.  If neither are specified, then an error 
will occur.
+* `kafka.producerConfigs` : A map of kafka producer configs for advanced 
customization.
+ 
+
 ## HDFS Writer
 The HDFS writer included here expands on what Storm has in several ways. 
There's customization in syncing to HDFS, rotation policy, etc. In addition, 
the writer allows for users to define output paths based on the fields in the 
provided JSON message.  This can be defined using Stellar.
 

http://git-wip-us.apache.org/repos/asf/metron/blob/097ce950/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
index efb2418..599ecbd 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
@@ -19,6 +19,7 @@ package org.apache.metron.writer.kafka;
 
 import com.google.common.base.Joiner;
 import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
 import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -41,8 +42,11 @@ import org.apache.metron.writer.AbstractWriter;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class KafkaWriter extends AbstractWriter implements 
BulkMessageWriter<JSONObject>, Serializable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   public enum Configurations {
      BROKER("kafka.brokerUrl")
     ,KEY_SERIALIZER("kafka.keySerializer")
@@ -50,6 +54,7 @@ public class KafkaWriter extends AbstractWriter implements 
BulkMessageWriter<JSO
     ,VALUE_SERIALIZER("kafka.valueSerializer")
     ,REQUIRED_ACKS("kafka.requiredAcks")
     ,TOPIC("kafka.topic")
+    ,TOPIC_FIELD("kafka.topicField")
     ,PRODUCER_CONFIGS("kafka.producerConfigs");
     ;
     String key;
@@ -81,6 +86,7 @@ public class KafkaWriter extends AbstractWriter implements 
BulkMessageWriter<JSO
   private String valueSerializer = 
"org.apache.kafka.common.serialization.StringSerializer";
   private int requiredAcks = 1;
   private String kafkaTopic = Constants.ENRICHMENT_TOPIC;
+  private String kafkaTopicField = null;
   private KafkaProducer kafkaProducer;
   private String configPrefix = null;
   private String zkQuorum = null;
@@ -120,6 +126,12 @@ public class KafkaWriter extends AbstractWriter implements 
BulkMessageWriter<JSO
     this.kafkaTopic= topic;
     return this;
   }
+
+  public KafkaWriter withTopicField(String topicField) {
+    this.kafkaTopicField = topicField;
+    return this;
+  }
+
   public KafkaWriter withConfigPrefix(String prefix) {
     this.configPrefix = prefix;
     return this;
@@ -166,6 +178,10 @@ public class KafkaWriter extends AbstractWriter implements 
BulkMessageWriter<JSO
     if(topic != null) {
       withTopic(topic);
     }
+    String topicField = 
Configurations.TOPIC_FIELD.getAndConvert(getConfigPrefix(), configMap, 
String.class);
+    if(topicField != null) {
+      withTopicField(topicField);
+    }
     Map<String, Object> producerConfigs = 
(Map)Configurations.PRODUCER_CONFIGS.get(getConfigPrefix(), configMap);
     if(producerConfigs != null) {
       withProducerConfigs(producerConfigs);
@@ -197,6 +213,19 @@ public class KafkaWriter extends AbstractWriter implements 
BulkMessageWriter<JSO
     return producerConfig;
   }
 
+  public Optional<String> getKafkaTopic(JSONObject message) {
+    String t = null;
+    if(kafkaTopicField != null) {
+      t = (String)message.get(kafkaTopicField);
+      LOG.debug("Sending to topic: {} based on the field {}", t, 
kafkaTopicField);
+    }
+    else {
+      t = kafkaTopic;
+      LOG.debug("Sending to topic: {}", t);
+    }
+    return Optional.ofNullable(t);
+  }
+
   @Override
   public BulkWriterResponse write(String sensorType, WriterConfiguration 
configurations,
       Iterable<Tuple> tuples, List<JSONObject> messages) {
@@ -212,10 +241,16 @@ public class KafkaWriter extends AbstractWriter 
implements BulkMessageWriter<JSO
         writerResponse.addError(t, tuple);
         continue;
       }
-      Future future = kafkaProducer
-          .send(new ProducerRecord<String, String>(kafkaTopic, jsonMessage));
-      // we want to manage the batching
-      results.add(new AbstractMap.SimpleEntry<>(tuple, future));
+      Optional<String> topic = getKafkaTopic(message);
+      if(topic.isPresent()) {
+        Future future = kafkaProducer
+            .send(new ProducerRecord<String, String>(topic.get(), 
jsonMessage));
+        // we want to manage the batching
+        results.add(new AbstractMap.SimpleEntry<>(tuple, future));
+      }
+      else {
+        LOG.debug("Dropping {} because no topic is specified.", jsonMessage);
+      }
     }
 
     try {

http://git-wip-us.apache.org/repos/asf/metron/blob/097ce950/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java
 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java
index 1b95430..9d201b8 100644
--- 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java
+++ 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java
@@ -19,10 +19,12 @@
 package org.apache.metron.writer.kafka;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.json.simple.JSONObject;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -85,4 +87,65 @@ public class KafkaWriterTest {
     Assert.assertEquals(producerConfigs.get("key1"), 1);
     Assert.assertEquals(producerConfigs.get("key2"), "value2");
   }
+
+  @Test
+  public void testTopicField_bothTopicAndFieldSpecified() throws Exception {
+    KafkaWriter writer = new KafkaWriter();
+    WriterConfiguration configuration = createConfiguration(
+            new HashMap<String, Object>() {{
+              put("kafka.brokerUrl" , "localhost:6667");
+              put("kafka.topic" , SENSOR_TYPE);
+              put("kafka.topicField" , "kafka_topic");
+              put("kafka.producerConfigs" , ImmutableMap.of("key1", 1, "key2", 
"value2"));
+            }}
+    );
+
+    writer.configure(SENSOR_TYPE, configuration);
+    Assert.assertEquals( "metron"
+                       , writer.getKafkaTopic(new JSONObject() {{
+                          put("kafka_topic", "metron");
+                         }}).get()
+                       );
+    Assert.assertFalse( writer.getKafkaTopic(new JSONObject()).isPresent() );
+
+  }
+
+  @Test
+  public void testTopicField_onlyFieldSpecified() throws Exception {
+    KafkaWriter writer = new KafkaWriter();
+    WriterConfiguration configuration = createConfiguration(
+            new HashMap<String, Object>() {{
+              put("kafka.brokerUrl" , "localhost:6667");
+              put("kafka.topicField" , "kafka_topic");
+              put("kafka.producerConfigs" , ImmutableMap.of("key1", 1, "key2", 
"value2"));
+            }}
+    );
+
+    writer.configure(SENSOR_TYPE, configuration);
+    Assert.assertEquals( "metron"
+                       , writer.getKafkaTopic(new JSONObject() {{
+                          put("kafka_topic", "metron");
+                         }}).get()
+                       );
+    Assert.assertFalse( writer.getKafkaTopic(new JSONObject()).isPresent() );
+  }
+
+  @Test
+  public void testTopicField_neitherSpecified() throws Exception {
+    KafkaWriter writer = new KafkaWriter();
+    WriterConfiguration configuration = createConfiguration(
+            new HashMap<String, Object>() {{
+              put("kafka.brokerUrl" , "localhost:6667");
+              put("kafka.producerConfigs" , ImmutableMap.of("key1", 1, "key2", 
"value2"));
+            }}
+    );
+
+    writer.configure(SENSOR_TYPE, configuration);
+    Assert.assertEquals(Constants.ENRICHMENT_TOPIC
+                       , writer.getKafkaTopic(new JSONObject() {{
+                          put("kafka_topic", "metron");
+                         }}).get()
+                       );
+    Assert.assertTrue( writer.getKafkaTopic(new JSONObject()).isPresent() );
+  }
 }

Reply via email to