http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java index 15ce735..06f4cec 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java @@ -58,6 +58,7 @@ import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater; import org.apache.metron.parsers.BasicParser; import org.apache.metron.parsers.interfaces.MessageFilter; import org.apache.metron.parsers.interfaces.MessageParser; +import org.apache.metron.parsers.topology.ParserComponents; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.test.bolt.BaseBoltTest; import org.apache.metron.test.error.MetronErrorJSONMatcher; @@ -185,7 +186,15 @@ public class ParserBoltTest extends BaseBoltTest { @Test public void testEmpty() throws Exception { String sensorType = "yaf"; - ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) { + Map<String, ParserComponents> parserMap = Collections.singletonMap( + sensorType, + new ParserComponents( + parser, + null, + new WriterHandler(writer) + ) + ); + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) { @Override protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { return ParserBoltTest.createUpdater(); @@ -209,7 +218,7 @@ public class ParserBoltTest extends BaseBoltTest { MetronError error = new MetronError() .withErrorType(Constants.ErrorType.PARSER_ERROR) .withThrowable(new NullPointerException()) - .withSensorType(sensorType) + .withSensorType(Collections.singleton(sensorType)) .addRawMessage(sampleBinary); verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject()))); } @@ -217,7 +226,15 @@ public class ParserBoltTest extends BaseBoltTest { @Test public void testInvalid() throws Exception { String sensorType = "yaf"; - ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) { + Map<String, ParserComponents> parserMap = Collections.singletonMap( + sensorType, + new ParserComponents( + parser, + null, + new WriterHandler(writer) + ) + ); + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) { @Override protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { return ParserBoltTest.createUpdater(); @@ -243,7 +260,7 @@ public class ParserBoltTest extends BaseBoltTest { MetronError error = new MetronError() .withErrorType(Constants.ErrorType.PARSER_INVALID) - .withSensorType(sensorType) + .withSensorType(Collections.singleton(sensorType)) .withErrorFields(new HashSet<String>() {{ add("field"); }}) .addRawMessage(new JSONObject(){{ put("field", "invalidValue"); @@ -255,14 +272,20 @@ public class ParserBoltTest extends BaseBoltTest { @Test public void test() throws Exception { - String sensorType = "yaf"; - ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) { + Map<String, ParserComponents> parserMap = Collections.singletonMap( + sensorType, + new ParserComponents( + parser, + null, + new WriterHandler(writer) + ) + ); + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) { @Override protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { return ParserBoltTest.createUpdater(); } - }; parserBolt.setCuratorFramework(client); parserBolt.setZKCache(cache); @@ -290,7 +313,6 @@ public class ParserBoltTest extends BaseBoltTest { when(parser.validate(eq(messages.get(1)))).thenReturn(true); when(filter.emitTuple(eq(messages.get(0)), any())).thenReturn(false); when(filter.emitTuple(eq(messages.get(1)), any())).thenReturn(true); - parserBolt.withMessageFilter(filter); parserBolt.execute(tuple); verify(writer, times(1)).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), eq(finalMessage2)); verify(outputCollector, times(2)).ack(tuple); @@ -317,21 +339,15 @@ public class ParserBoltTest extends BaseBoltTest { @Test public void testFilterSuccess() throws Exception { String sensorType = "yaf"; - - ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) { - @Override - protected SensorParserConfig getSensorParserConfig() { - try { - return SensorParserConfig.fromBytes(Bytes.toBytes(sensorParserConfig)); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - @Override - protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { - return ParserBoltTest.createUpdater(Optional.of(1)); - } - }; + Map<String, ParserComponents> parserMap = Collections.singletonMap( + sensorType, + new ParserComponents( + parser, + null, + new WriterHandler(batchWriter) + ) + ); + ParserBolt parserBolt = buildParserBolt(parserMap, sensorParserConfig); parserBolt.setCuratorFramework(client); parserBolt.setZKCache(cache); @@ -358,10 +374,17 @@ public class ParserBoltTest extends BaseBoltTest { @Test public void testFilterFailure() throws Exception { String sensorType = "yaf"; - - ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) { + Map<String, ParserComponents> parserMap = Collections.singletonMap( + sensorType, + new ParserComponents( + parser, + null, + new WriterHandler(batchWriter) + ) + ); + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) { @Override - protected SensorParserConfig getSensorParserConfig() { + protected SensorParserConfig getSensorParserConfig(String sensorType) { try { return SensorParserConfig.fromBytes(Bytes.toBytes(sensorParserConfig)); } catch (IOException e) { @@ -433,21 +456,15 @@ public class ParserBoltTest extends BaseBoltTest { } }; - ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, dummyParser, new WriterHandler(recordingWriter)) { - @Override - protected SensorParserConfig getSensorParserConfig() { - try { - return SensorParserConfig.fromBytes(Bytes.toBytes(csvWithFieldTransformations)); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { - return ParserBoltTest.createUpdater(Optional.of(1)); - } - }; + Map<String, ParserComponents> parserMap = Collections.singletonMap( + sensorType, + new ParserComponents( + dummyParser, + null, + new WriterHandler(recordingWriter) + ) + ); + ParserBolt parserBolt = buildParserBolt(parserMap, csvWithFieldTransformations); parserBolt.setCuratorFramework(client); parserBolt.setZKCache(cache); @@ -461,10 +478,16 @@ public class ParserBoltTest extends BaseBoltTest { @Test public void testDefaultBatchSize() throws Exception { - String sensorType = "yaf"; - - ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) { + Map<String, ParserComponents> parserMap = Collections.singletonMap( + sensorType, + new ParserComponents( + parser, + filter, + new WriterHandler(batchWriter) + ) + ); + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) { @Override protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { // this uses default batch size @@ -487,7 +510,6 @@ public class ParserBoltTest extends BaseBoltTest { response.addSuccess(uniqueTuples[i]); } when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(new HashSet<>(Arrays.asList(uniqueTuples))), any())).thenReturn(response); - parserBolt.withMessageFilter(filter); for (Tuple tuple : uniqueTuples) { parserBolt.execute(tuple); } @@ -498,10 +520,16 @@ public class ParserBoltTest extends BaseBoltTest { @Test public void testLessRecordsThanDefaultBatchSize() throws Exception { - String sensorType = "yaf"; - - ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) { + Map<String, ParserComponents> parserMap = Collections.singletonMap( + sensorType, + new ParserComponents( + parser, + filter, + new WriterHandler(batchWriter) + ) + ); + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) { @Override protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { // this uses default batch size @@ -524,7 +552,6 @@ public class ParserBoltTest extends BaseBoltTest { uniqueTuples[i] = mock(Tuple.class); response.addSuccess(uniqueTuples[i]); } - parserBolt.withMessageFilter(filter); for (Tuple tuple : uniqueTuples) { parserBolt.execute(tuple); } @@ -542,10 +569,16 @@ public class ParserBoltTest extends BaseBoltTest { @Test public void testBatchOfOne() throws Exception { - String sensorType = "yaf"; - - ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) { + Map<String, ParserComponents> parserMap = Collections.singletonMap( + sensorType, + new ParserComponents( + parser, + filter, + new WriterHandler(batchWriter) + ) + ); + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) { @Override protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { return ParserBoltTest.createUpdater(Optional.of(1)); @@ -563,17 +596,22 @@ public class ParserBoltTest extends BaseBoltTest { BulkWriterResponse response = new BulkWriterResponse(); response.addSuccess(t1); when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(Collections.singleton(t1)), any())).thenReturn(response); - parserBolt.withMessageFilter(filter); parserBolt.execute(t1); verify(outputCollector, times(1)).ack(t1); } @Test public void testBatchOfFive() throws Exception { - String sensorType = "yaf"; - - ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) { + Map<String, ParserComponents> parserMap = Collections.singletonMap( + sensorType, + new ParserComponents( + parser, + filter, + new WriterHandler(batchWriter) + ) + ); + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) { @Override protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { return ParserBoltTest.createUpdater(Optional.of(5)); @@ -592,7 +630,6 @@ public class ParserBoltTest extends BaseBoltTest { BulkWriterResponse response = new BulkWriterResponse(); response.addAllSuccesses(tuples); when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(tuples), any())).thenReturn(response); - parserBolt.withMessageFilter(filter); writeNonBatch(outputCollector, parserBolt, t1); writeNonBatch(outputCollector, parserBolt, t2); writeNonBatch(outputCollector, parserBolt, t3); @@ -610,9 +647,16 @@ public class ParserBoltTest extends BaseBoltTest { @Test public void testBatchOfFiveWithError() throws Exception { - String sensorType = "yaf"; - ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) { + Map<String, ParserComponents> parserMap = Collections.singletonMap( + sensorType, + new ParserComponents( + parser, + filter, + new WriterHandler(batchWriter) + ) + ); + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) { @Override protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { return ParserBoltTest.createUpdater(Optional.of(5)); @@ -629,7 +673,6 @@ public class ParserBoltTest extends BaseBoltTest { when(parser.validate(any())).thenReturn(true); when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject()))); when(filter.emitTuple(any(), any(Context.class))).thenReturn(true); - parserBolt.withMessageFilter(filter); parserBolt.execute(t1); parserBolt.execute(t2); parserBolt.execute(t3); @@ -654,6 +697,25 @@ public class ParserBoltTest extends BaseBoltTest { parserBolt.getConfigurations().updateGlobalConfig(globalConfig); } + private ParserBolt buildParserBolt(Map<String, ParserComponents> parserMap, + String csvWithFieldTransformations) { + return new ParserBolt("zookeeperUrl", parserMap) { + @Override + protected SensorParserConfig getSensorParserConfig(String sensorType) { + try { + return SensorParserConfig.fromBytes(Bytes.toBytes(csvWithFieldTransformations)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { + return ParserBoltTest.createUpdater(Optional.of(1)); + } + }; + } + private static void writeNonBatch(OutputCollector collector, ParserBolt bolt, Tuple t) { bolt.execute(t); }
http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java index a23c368..b04d8f7 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java @@ -18,6 +18,20 @@ package org.apache.metron.parsers.bolt; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.log4j.Level; import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.IndexingConfigurations; @@ -38,20 +52,6 @@ import org.json.simple.JSONObject; import org.junit.Test; import org.mockito.Mock; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.argThat; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - public class WriterBoltTest extends BaseBoltTest{ @Mock protected TopologyContext topologyContext; @@ -164,7 +164,7 @@ public class WriterBoltTest extends BaseBoltTest{ MetronError error = new MetronError() .withErrorType(Constants.ErrorType.DEFAULT_ERROR) .withThrowable(new IllegalStateException("Unhandled bulk errors in response: {java.lang.Exception: write error=[tuple]}")) - .withSensorType(sensorType) + .withSensorType(Collections.singleton(sensorType)) .addRawMessage(new JSONObject()); verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject()))); } http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java index ec7c3ab..2cba40a 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java @@ -17,9 +17,18 @@ */ package org.apache.metron.parsers.integration; -import com.google.common.collect.ImmutableList; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import org.apache.commons.lang.SerializationUtils; -import org.apache.metron.common.configuration.ConfigurationsUtils; import org.apache.metron.common.configuration.FieldValidator; import org.apache.metron.common.configuration.ParserConfigurations; import org.apache.metron.common.configuration.SensorParserConfig; @@ -30,29 +39,10 @@ import org.apache.metron.common.writer.MessageWriter; import org.apache.metron.integration.ProcessorResult; import org.apache.metron.parsers.bolt.ParserBolt; import org.apache.metron.parsers.bolt.WriterHandler; -import org.apache.metron.parsers.interfaces.MessageParser; -import org.apache.storm.generated.GlobalStreamId; -import org.apache.storm.task.GeneralTopologyContext; +import org.apache.metron.parsers.topology.ParserComponents; import org.apache.storm.task.OutputCollector; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.MessageId; import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.TupleImpl; import org.json.simple.JSONObject; -import org.mockito.Matchers; - -import java.io.Closeable; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,12 +84,20 @@ public class ParserDriver implements Serializable { public ShimParserBolt(List<byte[]> output) { super(null - , sensorType == null?config.getSensorTopic():sensorType - , ReflectionUtils.createInstance(config.getParserClassName()) - , new WriterHandler( new CollectingWriter(output)) + , Collections.singletonMap( + sensorType == null ? config.getSensorTopic() : sensorType, + new ParserComponents( + ReflectionUtils.createInstance(config.getParserClassName()), + null, + new WriterHandler(new CollectingWriter(output)) + ) + ) ); this.output = output; - getParser().configure(config.getParserConfig()); + Map<String, ParserComponents> sensorToComponentMap = getSensorToComponentMap(); + for(Entry<String, ParserComponents> sensorToComponents : sensorToComponentMap.entrySet()) { + sensorToComponents.getValue().getMessageParser().configure(config.getParserConfig()); + } } @Override http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java index 7f40684..15b53b7 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java @@ -17,6 +17,17 @@ */ package org.apache.metron.parsers.integration.components; +import static org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots; +import static org.apache.metron.integration.components.FluxTopologyComponent.cleanupWorkerDir; + +import java.lang.invoke.MethodHandles; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; import org.apache.metron.integration.InMemoryComponent; import org.apache.metron.integration.UnableToStartException; import org.apache.metron.integration.components.ZKServerComponent; @@ -27,22 +38,13 @@ import org.apache.storm.generated.KillOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; - -import static org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots; -import static org.apache.metron.integration.components.FluxTopologyComponent.cleanupWorkerDir; - public class ParserTopologyComponent implements InMemoryComponent { protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private Properties topologyProperties; private String brokerUrl; - private String sensorType; + private List<String> sensorTypes; private LocalCluster stormCluster; private String outputTopic; private String errorTopic; @@ -51,7 +53,7 @@ public class ParserTopologyComponent implements InMemoryComponent { Properties topologyProperties; String brokerUrl; - String sensorType; + List<String> sensorTypes; String outputTopic; String errorTopic; @@ -63,8 +65,8 @@ public class ParserTopologyComponent implements InMemoryComponent { this.brokerUrl = brokerUrl; return this; } - public Builder withSensorType(String sensorType) { - this.sensorType = sensorType; + public Builder withSensorTypes(List<String> sensorTypes) { + this.sensorTypes = sensorTypes; return this; } @@ -80,7 +82,7 @@ public class ParserTopologyComponent implements InMemoryComponent { public ParserTopologyComponent build() { - if(sensorType == null) { + if(sensorTypes == null || sensorTypes.isEmpty()) { throw new IllegalArgumentException("The sensor type must be defined."); } @@ -88,20 +90,20 @@ public class ParserTopologyComponent implements InMemoryComponent { throw new IllegalArgumentException("The output topic must be defined."); } - return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorType, outputTopic, errorTopic); + return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorTypes, outputTopic, errorTopic); } } - public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, String sensorType, String outputTopic, String errorTopic) { + public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, List<String> sensorTypes, String outputTopic, String errorTopic) { this.topologyProperties = topologyProperties; this.brokerUrl = brokerUrl; - this.sensorType = sensorType; + this.sensorTypes = sensorTypes; this.outputTopic = outputTopic; this.errorTopic = errorTopic; } - public void updateSensorType(String sensorType) { - this.sensorType = sensorType; + public void updateSensorTypes(List<String> sensorTypes) { + this.sensorTypes = sensorTypes; } @Override @@ -112,14 +114,14 @@ public class ParserTopologyComponent implements InMemoryComponent { ParserTopologyBuilder.ParserTopology topologyBuilder = ParserTopologyBuilder.build ( topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY), Optional.ofNullable(brokerUrl), - sensorType, - (x,y) -> 1, - (x,y) -> 1, + sensorTypes, + (x,y) -> Collections.nCopies(sensorTypes.size(), 1), + (x,y) -> Collections.nCopies(sensorTypes.size(), 1), (x,y) -> 1, (x,y) -> 1, (x,y) -> 1, (x,y) -> 1, - (x,y) -> new HashMap<>(), + (x,y) -> Collections.nCopies(sensorTypes.size(), new HashMap<>()), (x,y) -> null, (x,y) -> outputTopic, (x,y) -> errorTopic, @@ -131,9 +133,9 @@ public class ParserTopologyComponent implements InMemoryComponent { ); stormCluster = new LocalCluster(); - stormCluster.submitTopology(sensorType, stormConf, topologyBuilder.getBuilder().createTopology()); + stormCluster.submitTopology(getTopologyName(), stormConf, topologyBuilder.getBuilder().createTopology()); } catch (Exception e) { - throw new UnableToStartException("Unable to start parser topology for sensorType: " + sensorType, e); + throw new UnableToStartException("Unable to start parser topology for sensorTypes: " + sensorTypes, e); } } @@ -177,7 +179,7 @@ public class ParserTopologyComponent implements InMemoryComponent { protected void killTopology() { KillOptions ko = new KillOptions(); ko.set_wait_secs(0); - stormCluster.killTopologyWithOpts(sensorType, ko); + stormCluster.killTopologyWithOpts(getTopologyName(), ko); try { // Actually wait for it to die. Thread.sleep(2000); @@ -185,4 +187,8 @@ public class ParserTopologyComponent implements InMemoryComponent { // Do nothing } } + + protected String getTopologyName() { + return StringUtils.join(sensorTypes, "__"); + } } http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java index fcfc93b..ae459f4 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java @@ -18,31 +18,34 @@ package org.apache.metron.parsers.topology; -import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Predicate; +import java.util.function.Supplier; +import org.adrianwalker.multilinestring.Multiline; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.ParseException; import org.apache.commons.cli.Parser; +import org.apache.commons.cli.PosixParser; +import org.apache.commons.io.FileUtils; import org.apache.log4j.Level; import org.apache.metron.common.configuration.SensorParserConfig; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.parsers.topology.config.ValueSupplier; import org.apache.metron.test.utils.UnitTestHelper; import org.apache.storm.Config; -import com.google.common.collect.ImmutableMap; -import org.adrianwalker.multilinestring.Multiline; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.MissingOptionException; -import org.apache.commons.cli.ParseException; -import org.apache.commons.cli.PosixParser; -import org.apache.commons.io.FileUtils; import org.junit.Assert; import org.junit.Test; -import java.io.File; -import java.io.IOException; -import java.lang.ref.Reference; -import java.util.*; -import java.util.function.Predicate; -import java.util.function.Supplier; - public class ParserTopologyCLITest { @@ -103,11 +106,11 @@ public class ParserTopologyCLITest { public void kafkaOffset(boolean longOpt) throws ParseException { CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker") .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk") - .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor") + .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor") .build(longOpt); Assert.assertEquals("myzk", ParserTopologyCLI.ParserOptions.ZK_QUORUM.get(cli)); Assert.assertEquals("mybroker", ParserTopologyCLI.ParserOptions.BROKER_URL.get(cli)); - Assert.assertEquals("mysensor", ParserTopologyCLI.ParserOptions.SENSOR_TYPE.get(cli)); + Assert.assertEquals("mysensor", ParserTopologyCLI.ParserOptions.SENSOR_TYPES.get(cli)); } @Test public void testCLI_happyPath() throws ParseException { @@ -127,11 +130,11 @@ public class ParserTopologyCLITest { public void happyPath(boolean longOpt) throws ParseException { CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker") .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk") - .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor") + .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor") .build(longOpt); Assert.assertEquals("myzk", ParserTopologyCLI.ParserOptions.ZK_QUORUM.get(cli)); Assert.assertEquals("mybroker", ParserTopologyCLI.ParserOptions.BROKER_URL.get(cli)); - Assert.assertEquals("mysensor", ParserTopologyCLI.ParserOptions.SENSOR_TYPE.get(cli)); + Assert.assertEquals("mysensor", ParserTopologyCLI.ParserOptions.SENSOR_TYPES.get(cli)); } @Test @@ -143,7 +146,7 @@ public class ParserTopologyCLITest { public void testConfig_noExtra(boolean longOpt) throws ParseException { CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker") .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk") - .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor") + .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor") .with(ParserTopologyCLI.ParserOptions.NUM_WORKERS, "1") .with(ParserTopologyCLI.ParserOptions.NUM_ACKERS, "2") .with(ParserTopologyCLI.ParserOptions.NUM_MAX_TASK_PARALLELISM, "3") @@ -166,7 +169,7 @@ public class ParserTopologyCLITest { public void testOutputTopic(boolean longOpt) throws ParseException { CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker") .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk") - .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor") + .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor") .with(ParserTopologyCLI.ParserOptions.OUTPUT_TOPIC, "my_topic") .build(longOpt); Assert.assertEquals("my_topic", ParserTopologyCLI.ParserOptions.OUTPUT_TOPIC.get(cli)); @@ -193,7 +196,7 @@ public class ParserTopologyCLITest { FileUtils.write(extraFile, extraConfig); CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker") .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk") - .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor") + .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor") .with(ParserTopologyCLI.ParserOptions.MESSAGE_TIMEOUT, "4") .with(ParserTopologyCLI.ParserOptions.EXTRA_OPTIONS, extraFile.getAbsolutePath()) .build(longOpt); @@ -208,50 +211,50 @@ public class ParserTopologyCLITest { } private static class ParserInput { - private Integer spoutParallelism; - private Integer spoutNumTasks; + private List<Integer> spoutParallelism; + private List<Integer> spoutNumTasks; private Integer parserParallelism; private Integer parserNumTasks; private Integer errorParallelism; private Integer errorNumTasks; - private Map<String, Object> spoutConfig; + private List<Map<String, Object>> spoutConfig; private String securityProtocol; private Config stormConf; private String outputTopic; private String errorTopic; - public ParserInput(ValueSupplier<Integer> spoutParallelism, - ValueSupplier<Integer> spoutNumTasks, + public ParserInput(ValueSupplier<List> spoutParallelism, + ValueSupplier<List> spoutNumTasks, ValueSupplier<Integer> parserParallelism, ValueSupplier<Integer> parserNumTasks, ValueSupplier<Integer> errorParallelism, ValueSupplier<Integer> errorNumTasks, - ValueSupplier<Map> spoutConfig, + ValueSupplier<List> spoutConfig, ValueSupplier<String> securityProtocol, ValueSupplier<Config> stormConf, ValueSupplier<String> outputTopic, ValueSupplier<String> errorTopic, - SensorParserConfig config + List<SensorParserConfig> configs ) { - this.spoutParallelism = spoutParallelism.get(config, Integer.class); - this.spoutNumTasks = spoutNumTasks.get(config, Integer.class); - this.parserParallelism = parserParallelism.get(config, Integer.class); - this.parserNumTasks = parserNumTasks.get(config, Integer.class); - this.errorParallelism = errorParallelism.get(config, Integer.class); - this.errorNumTasks = errorNumTasks.get(config, Integer.class); - this.spoutConfig = spoutConfig.get(config, Map.class); - this.securityProtocol = securityProtocol.get(config, String.class); - this.stormConf = stormConf.get(config, Config.class); - this.outputTopic = outputTopic.get(config, String.class); - this.errorTopic = outputTopic.get(config, String.class); + this.spoutParallelism = spoutParallelism.get(configs, List.class); + this.spoutNumTasks = spoutNumTasks.get(configs, List.class); + this.parserParallelism = parserParallelism.get(configs, Integer.class); + this.parserNumTasks = parserNumTasks.get(configs, Integer.class); + this.errorParallelism = errorParallelism.get(configs, Integer.class); + this.errorNumTasks = errorNumTasks.get(configs, Integer.class); + this.spoutConfig = spoutConfig.get(configs, List.class); + this.securityProtocol = securityProtocol.get(configs, String.class); + this.stormConf = stormConf.get(configs, Config.class); + this.outputTopic = outputTopic.get(configs, String.class); + this.errorTopic = errorTopic.get(configs, String.class); } - public Integer getSpoutParallelism() { + public List<Integer> getSpoutParallelism() { return spoutParallelism; } - public Integer getSpoutNumTasks() { + public List<Integer> getSpoutNumTasks() { return spoutNumTasks; } @@ -271,7 +274,7 @@ public class ParserTopologyCLITest { return errorNumTasks; } - public Map<String, Object> getSpoutConfig() { + public List<Map<String, Object>> getSpoutConfig() { return spoutConfig; } @@ -330,43 +333,116 @@ public class ParserTopologyCLITest { @Test public void testSpoutParallelism() throws Exception { testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_PARALLELISM - , "10" - , input -> input.getSpoutParallelism().equals(10) - , () -> { - SensorParserConfig config = getBaseConfig(); - config.setSpoutParallelism(20); - return config; - } - , input -> input.getSpoutParallelism().equals(20) - ); + , "10" + , input -> input.getSpoutParallelism().equals(Collections.singletonList(10)) + , () -> { + SensorParserConfig config = getBaseConfig(); + config.setSpoutParallelism(20); + return Collections.singletonList(config); + } + , input -> input.getSpoutParallelism().equals(Collections.singletonList(20)) + ); + } + + @Test + public void testSpoutParallelismMultiple() throws Exception { + // Each spout uses it's own + // Return one per spout. + List<Integer> spoutParCli = new ArrayList<>(); + spoutParCli.add(10); + spoutParCli.add(12); + List<Integer> spoutParConfig = new ArrayList<>(); + spoutParConfig.add(20); + spoutParConfig.add(30); + testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_PARALLELISM + , "10,12" + , input -> input.getSpoutParallelism().equals(spoutParCli) + , () -> { + SensorParserConfig config = getBaseConfig(); + config.setSpoutParallelism(20); + SensorParserConfig config2 = getBaseConfig(); + config2.setSpoutParallelism(30); + List<SensorParserConfig> configs = new ArrayList<>(); + configs.add(config); + configs.add(config2); + return configs; + } + , input -> input.getSpoutParallelism().equals(spoutParConfig) + ); } @Test public void testSpoutNumTasks() throws Exception { testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_NUM_TASKS , "10" - , input -> input.getSpoutNumTasks().equals(10) + , input -> input.getSpoutNumTasks().equals(Collections.singletonList(10)) , () -> { SensorParserConfig config = getBaseConfig(); config.setSpoutNumTasks(20); - return config; + return Collections.singletonList(config); } - , input -> input.getSpoutNumTasks().equals(20) + , input -> input.getSpoutNumTasks().equals(Collections.singletonList(20)) ); } @Test + public void testSpoutNumTasksMultiple() throws Exception { + // Return one per spout. + List<Integer> numTasksCli = new ArrayList<>(); + numTasksCli.add(10); + numTasksCli.add(12); + List<Integer> numTasksConfig = new ArrayList<>(); + numTasksConfig.add(20); + numTasksConfig.add(30); + testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_NUM_TASKS + , "10,12" + , input -> input.getSpoutNumTasks().equals(numTasksCli) + , () -> { + SensorParserConfig config = getBaseConfig(); + config.setSpoutNumTasks(20); + SensorParserConfig config2 = getBaseConfig(); + config2.setSpoutNumTasks(30); + List<SensorParserConfig> configs = new ArrayList<>(); + configs.add(config); + configs.add(config2); + return configs; + } + , input -> input.getSpoutNumTasks().equals(numTasksConfig) + ); + } + + @Test public void testParserParallelism() throws Exception { testConfigOption(ParserTopologyCLI.ParserOptions.PARSER_PARALLELISM - , "10" - , input -> input.getParserParallelism().equals(10) - , () -> { - SensorParserConfig config = getBaseConfig(); - config.setParserParallelism(20); - return config; - } - , input -> input.getParserParallelism().equals(20) - ); + , "10" + , input -> input.getParserParallelism().equals(10) + , () -> { + SensorParserConfig config = getBaseConfig(); + config.setParserParallelism(20); + return Collections.singletonList(config); + } + , input -> input.getParserParallelism().equals(20) + ); + } + + @Test + public void testParserParallelismMultiple() throws Exception { + // Last one wins + testConfigOption(ParserTopologyCLI.ParserOptions.PARSER_PARALLELISM + , "10" + , input -> input.getParserParallelism().equals(10) + , () -> { + SensorParserConfig config = getBaseConfig(); + config.setParserParallelism(20); + SensorParserConfig config2 = getBaseConfig(); + config2.setParserParallelism(30); + List<SensorParserConfig> configs = new ArrayList<>(); + configs.add(config); + configs.add(config2); + return configs; + } + , input -> input.getParserParallelism().equals(30) + ); } @Test @@ -377,13 +453,32 @@ public class ParserTopologyCLITest { , () -> { SensorParserConfig config = getBaseConfig(); config.setParserNumTasks(20); - return config; + SensorParserConfig config2 = getBaseConfig(); + config2.setParserNumTasks(30); + List<SensorParserConfig> configs = new ArrayList<>(); + configs.add(config); + configs.add(config2); + return configs; } - , input -> input.getParserNumTasks().equals(20) + , input -> input.getParserNumTasks().equals(30) ); } @Test + public void testParserNumTasksMultiple() throws Exception { + testConfigOption(ParserTopologyCLI.ParserOptions.PARSER_NUM_TASKS + , "10" + , input -> input.getParserNumTasks().equals(10) + , () -> { + SensorParserConfig config = getBaseConfig(); + config.setParserNumTasks(20); + return Collections.singletonList(config); + } + , input -> input.getParserNumTasks().equals(20) + ); + } + + @Test public void testErrorParallelism() throws Exception { testConfigOption(ParserTopologyCLI.ParserOptions.ERROR_WRITER_PARALLELISM , "10" @@ -391,7 +486,7 @@ public class ParserTopologyCLITest { , () -> { SensorParserConfig config = getBaseConfig(); config.setErrorWriterParallelism(20); - return config; + return Collections.singletonList(config); } , input -> input.getErrorParallelism().equals(20) ); @@ -405,7 +500,7 @@ public class ParserTopologyCLITest { , () -> { SensorParserConfig config = getBaseConfig(); config.setErrorWriterNumTasks(20); - return config; + return Collections.singletonList(config); } , input -> input.getErrorNumTasks().equals(20) ); @@ -419,13 +514,55 @@ public class ParserTopologyCLITest { , () -> { SensorParserConfig config = getBaseConfig(); config.setSecurityProtocol("KERBEROS"); - return config; + return Collections.singletonList(config); } , input -> input.getSecurityProtocol().equals("KERBEROS") ); } @Test + public void testSecurityProtocol_fromCLIMultipleUniform() throws Exception { + testConfigOption(ParserTopologyCLI.ParserOptions.SECURITY_PROTOCOL + , "PLAINTEXT" + , input -> input.getSecurityProtocol().equals("PLAINTEXT") + , () -> { + SensorParserConfig config = getBaseConfig(); + config.setSecurityProtocol("PLAINTEXT"); + SensorParserConfig config2 = getBaseConfig(); + config2.setSecurityProtocol("PLAINTEXT"); + List<SensorParserConfig> configs = new ArrayList<>(); + configs.add(config); + configs.add(config2); + return configs; + } + , input -> input.getSecurityProtocol().equals("PLAINTEXT") + ); + } + + @Test + public void testSecurityProtocol_fromCLIMultipleMixed() throws Exception { + // Non plaintext wins + testConfigOption(ParserTopologyCLI.ParserOptions.SECURITY_PROTOCOL + , "PLAINTEXT" + , input -> input.getSecurityProtocol().equals("PLAINTEXT") + , () -> { + SensorParserConfig config = getBaseConfig(); + config.setSecurityProtocol("PLAINTEXT"); + SensorParserConfig config2 = getBaseConfig(); + config2.setSecurityProtocol("KERBEROS"); + SensorParserConfig config3 = getBaseConfig(); + config3.setSecurityProtocol("PLAINTEXT"); + List<SensorParserConfig> configs = new ArrayList<>(); + configs.add(config); + configs.add(config2); + configs.add(config3); + return configs; + } + , input -> input.getSecurityProtocol().equals("KERBEROS") + ); + } + + @Test public void testSecurityProtocol_fromSpout() throws Exception { //Ultimately the order of precedence is CLI > spout config > parser config File extraConfig = File.createTempFile("spoutConfig", "json"); @@ -444,7 +581,7 @@ public class ParserTopologyCLITest { , () -> { SensorParserConfig config = getBaseConfig(); config.setSecurityProtocol("PLAINTEXTSASL_FROM_ZK"); - return config; + return Collections.singletonList(config); } , input -> input.getSecurityProtocol().equals("PLAINTEXTSASL_FROM_ZK") ); @@ -458,7 +595,7 @@ public class ParserTopologyCLITest { , () -> { SensorParserConfig config = getBaseConfig(); config.setSecurityProtocol("PLAINTEXTSASL_FROM_ZK"); - return config; + return Collections.singletonList(config); } , input -> input.getSecurityProtocol().equals("PLAINTEXTSASL_FROM_ZK") ); @@ -481,7 +618,7 @@ public class ParserTopologyCLITest { SensorParserConfig config = getBaseConfig(); config.setNumWorkers(100); config.setNumAckers(200); - return config; + return Collections.singletonList(config); } , input -> { Config c = input.getStormConf(); @@ -519,7 +656,7 @@ public class ParserTopologyCLITest { put(Config.TOPOLOGY_ACKER_EXECUTORS, 200); }} ); - return config; + return Collections.singletonList(config); } , input -> { Config c = input.getStormConf(); @@ -542,22 +679,21 @@ public class ParserTopologyCLITest { put(ParserTopologyCLI.ParserOptions.SPOUT_CONFIG, extraConfig.getAbsolutePath()); }}; Predicate<ParserInput> cliOverrideExpected = input -> { - return input.getSpoutConfig().get("extra_config").equals("from_file"); + return input.getSpoutConfig().get(0).get("extra_config").equals("from_file"); }; Predicate<ParserInput> configOverrideExpected = input -> { - return input.getSpoutConfig().get("extra_config").equals("from_zk") - ; + return input.getSpoutConfig().get(0).get("extra_config").equals("from_zk"); }; - Supplier<SensorParserConfig> configSupplier = () -> { + Supplier<List<SensorParserConfig>> configSupplier = () -> { SensorParserConfig config = getBaseConfig(); config.setSpoutConfig( new HashMap<String, Object>() {{ put("extra_config", "from_zk"); }} ); - return config; + return Collections.singletonList(config); }; testConfigOption( cliOptions , cliOverrideExpected @@ -573,7 +709,7 @@ public class ParserTopologyCLITest { private void testConfigOption( ParserTopologyCLI.ParserOptions option , String cliOverride , Predicate<ParserInput> cliOverrideCondition - , Supplier<SensorParserConfig> configSupplier + , Supplier<List<SensorParserConfig>> configSupplier , Predicate<ParserInput> configOverrideCondition ) throws Exception { testConfigOption( @@ -588,48 +724,48 @@ public class ParserTopologyCLITest { private void testConfigOption( EnumMap<ParserTopologyCLI.ParserOptions, String> options , Predicate<ParserInput> cliOverrideCondition - , Supplier<SensorParserConfig> configSupplier + , Supplier<List<SensorParserConfig>> configSupplier , Predicate<ParserInput> configOverrideCondition ) throws Exception { //CLI Override - SensorParserConfig config = configSupplier.get(); + List<SensorParserConfig> configs = configSupplier.get(); { CLIBuilder builder = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker") .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk") - .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor"); + .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor"); for(Map.Entry<ParserTopologyCLI.ParserOptions, String> entry : options.entrySet()) { builder.with(entry.getKey(), entry.getValue()); } CommandLine cmd = builder.build(true); - ParserInput input = getInput(cmd, config); + ParserInput input = getInput(cmd, configs); Assert.assertTrue(cliOverrideCondition.test(input)); } // Config Override { CLIBuilder builder = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker") .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk") - .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor"); + .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor"); CommandLine cmd = builder.build(true); - ParserInput input = getInput(cmd, config); + ParserInput input = getInput(cmd, configs); Assert.assertTrue(configOverrideCondition.test(input)); } } - private static ParserInput getInput(CommandLine cmd, SensorParserConfig config ) throws Exception { + private static ParserInput getInput(CommandLine cmd, List<SensorParserConfig> configs ) throws Exception { final ParserInput[] parserInput = new ParserInput[]{null}; new ParserTopologyCLI() { @Override protected ParserTopologyBuilder.ParserTopology getParserTopology( String zookeeperUrl, Optional<String> brokerUrl, - String sensorType, - ValueSupplier<Integer> spoutParallelism, - ValueSupplier<Integer> spoutNumTasks, + List<String> sensorType, + ValueSupplier<List> spoutParallelism, + ValueSupplier<List> spoutNumTasks, ValueSupplier<Integer> parserParallelism, ValueSupplier<Integer> parserNumTasks, ValueSupplier<Integer> errorParallelism, ValueSupplier<Integer> errorNumTasks, - ValueSupplier<Map> spoutConfig, + ValueSupplier<List> spoutConfig, ValueSupplier<String> securityProtocol, ValueSupplier<Config> stormConf, ValueSupplier<String> outputTopic, @@ -647,7 +783,7 @@ public class ParserTopologyCLITest { stormConf, outputTopic, errorTopic, - config + configs ); return null; http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java index 49d7521..788df2d 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java @@ -19,6 +19,7 @@ package org.apache.metron.writers.integration; import com.google.common.collect.ImmutableList; +import java.util.Collections; import org.adrianwalker.multilinestring.Multiline; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; @@ -107,7 +108,7 @@ public class SimpleHbaseEnrichmentWriterIntegrationTest extends BaseIntegrationT .withParserSensorConfig(sensorType, parserConfig); ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder() - .withSensorType(sensorType) + .withSensorTypes(Collections.singletonList(sensorType)) .withTopologyProperties(topologyProperties) .withBrokerUrl(kafkaComponent.getBrokerList()) .withOutputTopic(parserConfig.getOutputTopic()) http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java index 99506de..cecba3d 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java @@ -27,6 +27,7 @@ import com.google.common.collect.Iterables; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -95,6 +96,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { * "sensorTopic": "dummy", * "outputTopic": "output", * "errorTopic": "parser_error", + * "readMetadata": true, * "parserConfig": { * "batchSize" : 1, * "columns" : { @@ -148,7 +150,12 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { }}; final Properties topologyProperties = new Properties(); - ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorType, parserConfig, globalConfigWithValidation); + ComponentRunner runner = setupTopologyComponents( + topologyProperties, + Collections.singletonList(sensorType), + Collections.singletonList(parserConfig), + globalConfigWithValidation + ); try { runner.start(); kafkaComponent.writeMessages(sensorType, inputMessages); @@ -172,7 +179,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { @Test public void parser_with_global_validations_writes_bad_records_to_error_topic() throws Exception { - final String sensorType = "dummy"; + final String sensorType = "dummy"; SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(parserConfigJSON, SensorParserConfig.class); final List<byte[]> inputMessages = new ArrayList<byte[]>() {{ add(Bytes.toBytes("valid,foo")); @@ -181,7 +188,8 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { }}; final Properties topologyProperties = new Properties(); - ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorType, parserConfig, globalConfigWithValidation); + ComponentRunner runner = setupTopologyComponents(topologyProperties, Collections.singletonList(sensorType), + Collections.singletonList(parserConfig), globalConfigWithValidation); try { runner.start(); kafkaComponent.writeMessages(sensorType, inputMessages); @@ -223,27 +231,31 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { * * @return runner */ - public ComponentRunner setupTopologyComponents(Properties topologyProperties, String sensorType, - SensorParserConfig parserConfig, String globalConfig) { + public ComponentRunner setupTopologyComponents(Properties topologyProperties, List<String> sensorTypes, + List<SensorParserConfig> parserConfigs, String globalConfig) { zkServerComponent = getZKServerComponent(topologyProperties); - kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{ - add(new KafkaComponent.Topic(sensorType, 1)); - add(new KafkaComponent.Topic(parserConfig.getErrorTopic(), 1)); - add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1)); - }}); + List<KafkaComponent.Topic> topics = new ArrayList<>(); + for(String sensorType : sensorTypes) { + topics.add(new KafkaComponent.Topic(sensorType, 1)); + } + topics.add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1)); + kafkaComponent = getKafkaComponent(topologyProperties, topics); topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList()); configUploadComponent = new ConfigUploadComponent() .withTopologyProperties(topologyProperties) - .withGlobalConfig(globalConfig) - .withParserSensorConfig(sensorType, parserConfig); + .withGlobalConfig(globalConfig); + + for (int i = 0; i < sensorTypes.size(); ++i) { + configUploadComponent.withParserSensorConfig(sensorTypes.get(i), parserConfigs.get(i)); + } parserTopologyComponent = new ParserTopologyComponent.Builder() - .withSensorType(sensorType) + .withSensorTypes(sensorTypes) .withTopologyProperties(topologyProperties) .withBrokerUrl(kafkaComponent.getBrokerList()) - .withErrorTopic(parserConfig.getErrorTopic()) - .withOutputTopic(parserConfig.getOutputTopic()) + .withErrorTopic(parserConfigs.get(0).getErrorTopic()) + .withOutputTopic(parserConfigs.get(0).getOutputTopic()) .build(); return new ComponentRunner.Builder() @@ -325,8 +337,22 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { @Multiline public static String offsetParserConfigJSON; + /** + * { + * "parserClassName":"org.apache.metron.writers.integration.WriterBoltIntegrationTest$DummyObjectParser", + * "sensorTopic":"dummyobjectparser", + * "outputTopic": "enrichments", + * "errorTopic": "parser_error", + * "parserConfig": { + * "batchSize" : 1 + * } + * } + */ + @Multiline + public static String dummyParserConfigJSON; + @Test - public void commits_kafka_offsets_for_emtpy_objects() throws Exception { + public void commits_kafka_offsets_for_empty_objects() throws Exception { final String sensorType = "emptyobjectparser"; SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(offsetParserConfigJSON, SensorParserConfig.class); final List<byte[]> inputMessages = new ArrayList<byte[]>() {{ @@ -335,7 +361,11 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { add(Bytes.toBytes("baz")); }}; final Properties topologyProperties = new Properties(); - ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorType, parserConfig, globalConfigEmpty); + ComponentRunner runner = setupTopologyComponents( + topologyProperties, + Collections.singletonList(sensorType), + Collections.singletonList(parserConfig), + globalConfigEmpty); try { runner.start(); kafkaComponent.writeMessages(sensorType, inputMessages); @@ -356,6 +386,64 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { } } + @Test + public void test_multiple_sensors() throws Exception { + // Setup first sensor + final String emptyObjectSensorType = "emptyobjectparser"; + SensorParserConfig emptyObjectParserConfig = JSONUtils.INSTANCE.load(offsetParserConfigJSON, SensorParserConfig.class); + final List<byte[]> emptyObjectInputMessages = new ArrayList<byte[]>() {{ + add(Bytes.toBytes("foo")); + add(Bytes.toBytes("bar")); + add(Bytes.toBytes("baz")); + }}; + + // Setup second sensor + final String dummySensorType = "dummyobjectparser"; + SensorParserConfig dummyParserConfig = JSONUtils.INSTANCE.load(dummyParserConfigJSON, SensorParserConfig.class); + final List<byte[]> dummyInputMessages = new ArrayList<byte[]>() {{ + add(Bytes.toBytes("dummy_foo")); + add(Bytes.toBytes("dummy_bar")); + add(Bytes.toBytes("dummy_baz")); + }}; + + final Properties topologyProperties = new Properties(); + + List<String> sensorTypes = new ArrayList<>(); + sensorTypes.add(emptyObjectSensorType); + sensorTypes.add(dummySensorType); + + List<SensorParserConfig> parserConfigs = new ArrayList<>(); + parserConfigs.add(emptyObjectParserConfig); + parserConfigs.add(dummyParserConfig); + + ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorTypes, parserConfigs, globalConfigEmpty); + try { + runner.start(); + kafkaComponent.writeMessages(emptyObjectSensorType, emptyObjectInputMessages); + kafkaComponent.writeMessages(dummySensorType, dummyInputMessages); + + final List<byte[]> allInputMessages = new ArrayList<>(); + allInputMessages.addAll(emptyObjectInputMessages); + allInputMessages.addAll(dummyInputMessages); + Processor allResultsProcessor = new AllResultsProcessor(allInputMessages, Constants.ENRICHMENT_TOPIC); + @SuppressWarnings("unchecked") + ProcessorResult<Set<JSONObject>> result = runner.process(allResultsProcessor); + + // validate the output messages + assertThat( + "size should match", + result.getResult().size(), + equalTo(allInputMessages.size())); + for (JSONObject record : result.getResult()) { + assertThat("record should have a guid", record.containsKey("guid"), equalTo(true)); + } + } finally { + if (runner != null) { + runner.stop(); + } + } + } + /** * Goal is to check returning an empty JSONObject in our List returned by parse. */ @@ -380,6 +468,34 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { } } + + /** + * Goal is to check returning an empty JSONObject in our List returned by parse. + */ + public static class DummyObjectParser implements MessageParser<JSONObject>, Serializable { + + @Override + public void init() { + } + + @SuppressWarnings("unchecked") + @Override + public List<JSONObject> parse(byte[] bytes) { + JSONObject dummy = new JSONObject(); + dummy.put("dummy_key", "dummy_value"); + return ImmutableList.of(dummy); + } + + @Override + public boolean validate(JSONObject message) { + return true; + } + + @Override + public void configure(Map<String, Object> map) { + } + } + /** * Verifies all messages in the provided List of input messages appears in the specified * Kafka output topic http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java index e35960f..7678584 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java @@ -23,6 +23,7 @@ import static java.lang.String.format; import com.google.common.collect.Iterables; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -118,7 +119,7 @@ public class BulkWriterComponent<MESSAGE_T> { public void error(String sensorType, Throwable e, Iterable<Tuple> tuples, MessageGetStrategy messageGetStrategy) { LOG.error(format("Failing %d tuple(s); sensorType=%s", Iterables.size(tuples), sensorType), e); MetronError error = new MetronError() - .withSensorType(sensorType) + .withSensorType(Collections.singleton(sensorType)) .withErrorType(Constants.ErrorType.INDEXING_ERROR) .withThrowable(e); tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t))); http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java index 0264b3d..c389854 100644 --- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java +++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java @@ -17,11 +17,22 @@ */ package org.apache.metron.writer; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.powermock.api.mockito.PowerMockito.verifyStatic; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.error.MetronError; import org.apache.metron.common.message.MessageGetStrategy; -import org.apache.metron.common.message.MessageGetters; import org.apache.metron.common.utils.ErrorUtils; import org.apache.metron.common.writer.BulkMessageWriter; import org.apache.metron.common.writer.BulkWriterResponse; @@ -38,19 +49,6 @@ import org.mockito.MockitoAnnotations; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.powermock.api.mockito.PowerMockito.mockStatic; -import static org.powermock.api.mockito.PowerMockito.verifyStatic; - @RunWith(PowerMockRunner.class) @PrepareForTest({BulkWriterComponent.class, ErrorUtils.class}) public class BulkWriterComponentTest { @@ -130,7 +128,7 @@ public class BulkWriterComponentTest { public void writeShouldProperlyHandleWriterErrors() throws Exception { Throwable e = new Exception("test exception"); MetronError error = new MetronError() - .withSensorType(sensorType) + .withSensorType(Collections.singleton(sensorType)) .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Arrays.asList(message1, message2)); BulkWriterResponse response = new BulkWriterResponse(); response.addAllErrors(e, tupleList); @@ -164,7 +162,7 @@ public class BulkWriterComponentTest { public void writeShouldProperlyHandleWriterException() throws Exception { Throwable e = new Exception("test exception"); MetronError error = new MetronError() - .withSensorType(sensorType) + .withSensorType(Collections.singleton(sensorType)) .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Arrays.asList(message1, message2)); BulkWriterResponse response = new BulkWriterResponse(); response.addAllErrors(e, tupleList); @@ -183,10 +181,10 @@ public class BulkWriterComponentTest { public void errorAllShouldClearMapsAndHandleErrors() throws Exception { Throwable e = new Exception("test exception"); MetronError error1 = new MetronError() - .withSensorType("sensor1") + .withSensorType(Collections.singleton("sensor1")) .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message1)); MetronError error2 = new MetronError() - .withSensorType("sensor2") + .withSensorType(Collections.singleton("sensor2")) .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message2)); BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector); http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/use-cases/parser_chaining/README.md ---------------------------------------------------------------------- diff --git a/use-cases/parser_chaining/README.md b/use-cases/parser_chaining/README.md index 26fd333..4055bcd 100644 --- a/use-cases/parser_chaining/README.md +++ b/use-cases/parser_chaining/README.md @@ -233,3 +233,17 @@ cat ~/data.log | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --b ``` You should see indices created for the `cisco-5-304` and `cisco-6-302` data with appropriate fields created for each type. + +# Aggregated Parsers with Parser Chaining +Chained parsers can be run as aggregated parsers. These parsers continue to use the sensor specific Kafka topics, and do not do internal routing to the appropriate sensor. + +Instead of creating a topology per sensor, all 3 (`pix-syslog-parser`, `cisco-5-304`, and `cisco-6-302`) can be run in a single aggregated parser. It's also possible to aggregate a subset of these parsers (e.g. run `cisco-6-302` as it's own topology, and aggregate the other 2). + +The step to start parsers then becomes +``` +$METRON_HOME/bin/start_parser_topology.sh -k $BROKERLIST -z $ZOOKEEPER -s cisco-6-302,cisco-5-304,pix_syslog_router +``` + +The flow through the Storm topology and Kafka topics: + + \ No newline at end of file
