METRON-1549: Add empty object test to WriterBoltIntegrationTest implementation (mmiklavc via mmiklavc) closes apache/metron#1009
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/b9453aab Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/b9453aab Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/b9453aab Branch: refs/heads/feature/METRON-1090-stellar-assignment Commit: b9453aabd781c7c67258d9506af176fbcab85be1 Parents: a17c1ad Author: mmiklavc <michael.miklav...@gmail.com> Authored: Fri May 11 12:04:01 2018 -0600 Committer: Michael Miklavcic <michael.miklav...@gmail.com> Committed: Fri May 11 12:04:01 2018 -0600 ---------------------------------------------------------------------- .../integration/WriterBoltIntegrationTest.java | 315 ++++++++++++++----- 1 file changed, 231 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/b9453aab/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java index cde08bc..d565147 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java @@ -17,40 +17,56 @@ */ package org.apache.metron.writers.integration; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import javax.annotation.Nullable; import org.adrianwalker.multilinestring.Multiline; import org.apache.hadoop.hbase.util.Bytes; import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.SensorParserConfig; -import org.apache.metron.stellar.dsl.Context; import org.apache.metron.common.field.validation.FieldValidation; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.enrichment.integration.components.ConfigUploadComponent; -import org.apache.metron.integration.*; +import org.apache.metron.integration.BaseIntegrationTest; +import org.apache.metron.integration.ComponentRunner; +import org.apache.metron.integration.Processor; +import org.apache.metron.integration.ProcessorResult; +import org.apache.metron.integration.ReadinessState; import org.apache.metron.integration.components.KafkaComponent; -import org.apache.metron.integration.processors.KafkaMessageSet; import org.apache.metron.integration.components.ZKServerComponent; +import org.apache.metron.integration.processors.KafkaMessageSet; import org.apache.metron.integration.processors.KafkaProcessor; -import org.apache.metron.parsers.csv.CSVParser; import org.apache.metron.parsers.integration.components.ParserTopologyComponent; -import org.apache.metron.test.utils.UnitTestHelper; +import org.apache.metron.parsers.interfaces.MessageParser; +import org.apache.metron.stellar.dsl.Context; import org.json.simple.JSONObject; -import org.json.simple.parser.ParseException; import org.junit.Assert; import org.junit.Test; -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.*; - public class WriterBoltIntegrationTest extends BaseIntegrationTest { + private ZKServerComponent zkServerComponent; + private KafkaComponent kafkaComponent; + private ConfigUploadComponent configUploadComponent; + private ParserTopologyComponent parserTopologyComponent; - public static class MockValidator implements FieldValidation{ + public static class MockValidator implements FieldValidation { @Override public boolean isValid(Map<String, Object> input, Map<String, Object> validationConfig, Map<String, Object> globalConfig, Context context) { - if(input.get("action").equals("invalid")) { + if (input.get("action").equals("invalid")) { return false; } return true; @@ -60,6 +76,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { public void initialize(Map<String, Object> validationConfig, Map<String, Object> globalConfig) { } } + /** * { * "fieldValidations" : [ @@ -68,7 +85,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { * } */ @Multiline - public static String globalConfig; + public static String globalConfigWithValidation; /** * { @@ -88,57 +105,23 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { public static String parserConfigJSON; @Test - public void test() throws UnableToStartException, IOException, ParseException { - - UnitTestHelper.setLog4jLevel(CSVParser.class, org.apache.log4j.Level.FATAL); + public void parser_with_global_validations_writes_bad_records_to_error_topic() throws Exception { final String sensorType = "dummy"; - SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(parserConfigJSON, SensorParserConfig.class); - - // the input messages to parser final List<byte[]> inputMessages = new ArrayList<byte[]>() {{ add(Bytes.toBytes("valid,foo")); add(Bytes.toBytes("invalid,foo")); add(Bytes.toBytes("error")); }}; - // setup external components; zookeeper, kafka final Properties topologyProperties = new Properties(); - final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties); - final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{ - add(new KafkaComponent.Topic(sensorType, 1)); - add(new KafkaComponent.Topic(parserConfig.getErrorTopic(), 1)); - add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1)); - }}); - topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList()); - - ConfigUploadComponent configUploadComponent = new ConfigUploadComponent() - .withTopologyProperties(topologyProperties) - .withGlobalConfig(globalConfig) - .withParserSensorConfig(sensorType, parserConfig); - - ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder() - .withSensorType(sensorType) - .withTopologyProperties(topologyProperties) - .withBrokerUrl(kafkaComponent.getBrokerList()) - .withErrorTopic(parserConfig.getErrorTopic()) - .withOutputTopic(parserConfig.getOutputTopic()) - .build(); - - ComponentRunner runner = new ComponentRunner.Builder() - .withComponent("zk", zkServerComponent) - .withComponent("kafka", kafkaComponent) - .withComponent("config", configUploadComponent) - .withComponent("org/apache/storm", parserTopologyComponent) - .withMillisecondsBetweenAttempts(5000) - .withNumRetries(10) - .withCustomShutdownOrder(new String[]{"org/apache/storm","config","kafka","zk"}) - .build(); + ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorType, parserConfig, globalConfigWithValidation); try { runner.start(); kafkaComponent.writeMessages(sensorType, inputMessages); - ProcessorResult<Map<String, List<JSONObject>>> result = runner.process( - getProcessor(parserConfig.getOutputTopic(), parserConfig.getErrorTopic())); + KafkaProcessor<Map<String, List<JSONObject>>> kafkaProcessor = getKafkaProcessor( + parserConfig.getOutputTopic(), parserConfig.getErrorTopic()); + ProcessorResult<Map<String, List<JSONObject>>> result = runner.process(kafkaProcessor); // validate the output messages Map<String,List<JSONObject>> outputMessages = result.getResult(); @@ -166,45 +149,209 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { } } + /** + * Setup external components (as side effects of invoking this method): + * zookeeper, kafka, config upload, parser topology, main runner. + * + * Modifies topology properties with relevant component properties, e.g. kafka.broker. + * + * @return runner + */ + public ComponentRunner setupTopologyComponents(Properties topologyProperties, String sensorType, + SensorParserConfig parserConfig, String globalConfig) { + zkServerComponent = getZKServerComponent(topologyProperties); + kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{ + add(new KafkaComponent.Topic(sensorType, 1)); + add(new KafkaComponent.Topic(parserConfig.getErrorTopic(), 1)); + add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1)); + }}); + topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList()); + + configUploadComponent = new ConfigUploadComponent() + .withTopologyProperties(topologyProperties) + .withGlobalConfig(globalConfig) + .withParserSensorConfig(sensorType, parserConfig); + + parserTopologyComponent = new ParserTopologyComponent.Builder() + .withSensorType(sensorType) + .withTopologyProperties(topologyProperties) + .withBrokerUrl(kafkaComponent.getBrokerList()) + .withErrorTopic(parserConfig.getErrorTopic()) + .withOutputTopic(parserConfig.getOutputTopic()) + .build(); + + return new ComponentRunner.Builder() + .withComponent("zk", zkServerComponent) + .withComponent("kafka", kafkaComponent) + .withComponent("config", configUploadComponent) + .withComponent("org/apache/storm", parserTopologyComponent) + .withMillisecondsBetweenAttempts(5000) + .withNumRetries(10) + .withCustomShutdownOrder(new String[]{"org/apache/storm","config","kafka","zk"}) + .build(); + } + + @SuppressWarnings("unchecked") + private KafkaProcessor<Map<String, List<JSONObject>>> getKafkaProcessor(String outputTopic, + String errorTopic) { + + return new KafkaProcessor<>() + .withKafkaComponentName("kafka") + .withReadTopic(outputTopic) + .withErrorTopic(errorTopic) + .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() { + @Nullable + @Override + public Boolean apply(@Nullable KafkaMessageSet messageSet) { + return (messageSet.getMessages().size() == 1) && (messageSet.getErrors().size() == 2); + } + }) + .withProvideResult(new Function<KafkaMessageSet, Map<String, List<JSONObject>>>() { + @Nullable + @Override + public Map<String, List<JSONObject>> apply(@Nullable KafkaMessageSet messageSet) { + return new HashMap<String, List<JSONObject>>() {{ + put(Constants.ENRICHMENT_TOPIC, loadMessages(messageSet.getMessages())); + put(errorTopic, loadMessages(messageSet.getErrors())); + }}; + } + }); + } + private static List<JSONObject> loadMessages(List<byte[]> outputMessages) { List<JSONObject> tmp = new ArrayList<>(); Iterables.addAll(tmp, - Iterables.transform(outputMessages, - message -> { - try { - return new JSONObject(JSONUtils.INSTANCE.load(new String(message), JSONUtils.MAP_SUPPLIER)); - } catch (Exception ex) { - throw new IllegalStateException(ex); - } - } - ) + Iterables.transform(outputMessages, + message -> { + try { + return new JSONObject( + JSONUtils.INSTANCE.load(new String(message), JSONUtils.MAP_SUPPLIER)); + } catch (Exception ex) { + throw new IllegalStateException(ex); + } + } + ) ); return tmp; } - @SuppressWarnings("unchecked") - private KafkaProcessor<Map<String,List<JSONObject>>> getProcessor(String outputTopic, String errorTopic){ + /** + * { } + */ + @Multiline + public static String globalConfigEmpty; - return new KafkaProcessor<>() - .withKafkaComponentName("kafka") - .withReadTopic(outputTopic) - .withErrorTopic(errorTopic) - .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() { - @Nullable - @Override - public Boolean apply(@Nullable KafkaMessageSet messageSet) { - return (messageSet.getMessages().size() == 1) && (messageSet.getErrors().size() == 2); - } - }) - .withProvideResult(new Function<KafkaMessageSet,Map<String,List<JSONObject>>>(){ - @Nullable - @Override - public Map<String,List<JSONObject>> apply(@Nullable KafkaMessageSet messageSet) { - return new HashMap<String, List<JSONObject>>() {{ - put(Constants.ENRICHMENT_TOPIC, loadMessages(messageSet.getMessages())); - put(errorTopic, loadMessages(messageSet.getErrors())); - }}; - } - }); + /** + * { + * "parserClassName":"org.apache.metron.writers.integration.WriterBoltIntegrationTest$EmptyObjectParser", + * "sensorTopic":"emptyobjectparser", + * "outputTopic": "enrichments", + * "errorTopic": "parser_error" + * } + */ + @Multiline + public static String offsetParserConfigJSON; + + @Test + public void commits_kafka_offsets_for_emtpy_objects() throws Exception { + final String sensorType = "emptyobjectparser"; + SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(offsetParserConfigJSON, SensorParserConfig.class); + final List<byte[]> inputMessages = new ArrayList<byte[]>() {{ + add(Bytes.toBytes("foo")); + add(Bytes.toBytes("bar")); + add(Bytes.toBytes("baz")); + }}; + final Properties topologyProperties = new Properties(); + ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorType, parserConfig, globalConfigEmpty); + try { + runner.start(); + kafkaComponent.writeMessages(sensorType, inputMessages); + Processor allResultsProcessor = new AllResultsProcessor(inputMessages, Constants.ENRICHMENT_TOPIC); + ProcessorResult<Set<JSONObject>> result = runner.process(allResultsProcessor); + + // validate the output messages + assertThat("size should match", result.getResult().size(), equalTo(inputMessages.size())); + for (JSONObject record : result.getResult()) { + assertThat("record should have a guid", record.containsKey("guid"), equalTo(true)); + assertThat("record should have correct source.type", record.get("source.type"), + equalTo(sensorType)); + } + } finally { + if (runner != null) { + runner.stop(); + } + } + } + + /** + * Goal is to check returning an empty JSONObject in our List returned by parse. + */ + public static class EmptyObjectParser implements MessageParser<JSONObject>, Serializable { + + @Override + public void init() { + } + + @Override + public List<JSONObject> parse(byte[] bytes) { + return ImmutableList.of(new JSONObject()); + } + + @Override + public boolean validate(JSONObject message) { + return true; } + + @Override + public void configure(Map<String, Object> map) { + } + } + + /** + * Verifies all messages in the provided List of input messages appears in the specified + * Kafka output topic + */ + private class AllResultsProcessor implements Processor<Set<JSONObject>> { + + private final List<byte[]> inputMessages; + private String outputKafkaTopic; + // used for calculating readiness and returning result set + private final Set<JSONObject> outputMessages = new HashSet<>(); + + public AllResultsProcessor(List<byte[]> inputMessages, String outputKafkaTopic) { + this.inputMessages = inputMessages; + this.outputKafkaTopic = outputKafkaTopic; + } + + @Override + public ReadinessState process(ComponentRunner runner) { + KafkaComponent kc = runner.getComponent("kafka", KafkaComponent.class); + outputMessages.addAll(readMessagesFromKafka(kc, outputKafkaTopic)); + return calcReadiness(inputMessages.size(), outputMessages.size()); + } + + private Set<JSONObject> readMessagesFromKafka(KafkaComponent kc, String topic) { + Set<JSONObject> out = new HashSet<>(); + for (byte[] b : kc.readMessages(topic)) { + try { + JSONObject m = new JSONObject( + JSONUtils.INSTANCE.load(new String(b), JSONUtils.MAP_SUPPLIER)); + out.add(m); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + return out; + } + + private ReadinessState calcReadiness(int in, int out) { + return in == out ? ReadinessState.READY : ReadinessState.NOT_READY; + } + + @Override + public ProcessorResult<Set<JSONObject>> getResult() { + return new ProcessorResult<>(outputMessages, null); + } + } + }