METRON-1681 Decouple the ParserBolt from the Parse execution logic (merrimanr) closes apache/metron#1213
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/28542ad6 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/28542ad6 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/28542ad6 Branch: refs/heads/master Commit: 28542ad64cf63f17b728b4b1c0e995a8973767f7 Parents: 08f3de0 Author: merrimanr <[email protected]> Authored: Thu Oct 18 13:59:52 2018 -0500 Committer: rmerriman <[email protected]> Committed: Thu Oct 18 13:59:52 2018 -0500 ---------------------------------------------------------------------- .../impl/SensorParserConfigServiceImpl.java | 51 +- .../parsers/DefaultParserRunnerResults.java | 71 ++ .../org/apache/metron/parsers/GrokParser.java | 3 +- .../org/apache/metron/parsers/ParserRunner.java | 60 ++ .../apache/metron/parsers/ParserRunnerImpl.java | 322 +++++++ .../metron/parsers/ParserRunnerResults.java | 33 + .../apache/metron/parsers/bolt/ParserBolt.java | 381 +++----- .../parsers/filters/BroMessageFilter.java | 2 +- .../metron/parsers/filters/StellarFilter.java | 2 +- .../parsers/interfaces/MessageFilter.java | 2 +- .../parsers/interfaces/MessageParser.java | 27 +- .../interfaces/MultilineMessageParser.java | 51 -- .../metron/parsers/syslog/Syslog5424Parser.java | 4 +- .../parsers/topology/ParserComponent.java | 56 ++ .../parsers/topology/ParserComponents.java | 67 -- .../parsers/topology/ParserTopologyBuilder.java | 39 +- .../org/apache/metron/filters/FiltersTest.java | 4 +- .../metron/parsers/MessageParserTest.java | 108 ++- .../metron/parsers/ParserRunnerImplTest.java | 390 +++++++++ .../metron/parsers/bolt/ParserBoltTest.java | 859 ++++++------------- .../parsers/integration/ParserDriver.java | 60 +- 21 files changed, 1481 insertions(+), 1111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java index 4cd272e..d0e4b3d 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java @@ -20,12 +20,10 @@ package org.apache.metron.rest.service.impl; import static org.apache.metron.rest.MetronRestConstants.GROK_CLASS_NAME; import com.fasterxml.jackson.databind.ObjectMapper; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import org.apache.curator.framework.CuratorFramework; import org.apache.hadoop.fs.Path; import org.apache.metron.common.configuration.ConfigurationType; @@ -35,18 +33,14 @@ import org.apache.metron.common.configuration.SensorParserConfig; import org.apache.metron.common.zookeeper.ConfigurationsCache; import org.apache.metron.parsers.interfaces.MessageParser; import org.apache.metron.parsers.interfaces.MessageParserResult; -import org.apache.metron.parsers.interfaces.MultilineMessageParser; import org.apache.metron.rest.MetronRestConstants; import org.apache.metron.rest.RestException; import org.apache.metron.rest.model.ParseMessageRequest; import org.apache.metron.rest.service.GrokService; import org.apache.metron.rest.service.SensorParserConfigService; import org.apache.metron.rest.util.ParserIndex; -import org.apache.metron.common.zookeeper.ZKConfigurationsCache; import org.apache.zookeeper.KeeperException; import org.json.simple.JSONObject; -import org.reflections.Reflections; -import org.reflections.util.ConfigurationBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -141,53 +135,13 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService } else if (sensorParserConfig.getParserClassName() == null) { throw new RestException("SensorParserConfig must have a parserClassName"); } else { - MultilineMessageParser<JSONObject> parser; - Object parserObject; + MessageParser<JSONObject> parser; try { - parserObject = Class.forName(sensorParserConfig.getParserClassName()) + parser = (MessageParser<JSONObject>) Class.forName(sensorParserConfig.getParserClassName()) .newInstance(); } catch (Exception e) { throw new RestException(e.toString(), e.getCause()); } - - if (!(parserObject instanceof MultilineMessageParser)) { - parser = new MultilineMessageParser<JSONObject>() { - - @Override - @SuppressWarnings("unchecked") - public void configure(Map<String, Object> config) { - ((MessageParser<JSONObject>)parserObject).configure(config); - } - - @Override - @SuppressWarnings("unchecked") - public void init() { - ((MessageParser<JSONObject>)parserObject).init(); - } - - @Override - @SuppressWarnings("unchecked") - public boolean validate(JSONObject message) { - return ((MessageParser<JSONObject>)parserObject).validate(message); - } - - @Override - @SuppressWarnings("unchecked") - public List<JSONObject> parse(byte[] message) { - return ((MessageParser<JSONObject>)parserObject).parse(message); - } - - @Override - @SuppressWarnings("unchecked") - public Optional<List<JSONObject>> parseOptional(byte[] message) { - return ((MessageParser<JSONObject>)parserObject).parseOptional(message); - } - }; - } else { - parser = (MultilineMessageParser<JSONObject>)parserObject; - } - - Path temporaryGrokPath = null; if (isGrokConfig(sensorParserConfig)) { String name = parseMessageRequest.getSensorParserConfig().getSensorTopic(); @@ -195,7 +149,6 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService sensorParserConfig.getParserConfig() .put(MetronRestConstants.GROK_PATH_KEY, new Path(temporaryGrokPath, name).toString()); } - parser.configure(sensorParserConfig.getParserConfig()); parser.init(); http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java new file mode 100644 index 0000000..79a9b5d --- /dev/null +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultParserRunnerResults.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.parsers; + +import org.apache.metron.common.error.MetronError; +import org.json.simple.JSONObject; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Default implementation of ParserRunnerResults. + */ +public class DefaultParserRunnerResults implements ParserRunnerResults<JSONObject> { + + private List<JSONObject> messages = new ArrayList<>(); + private List<MetronError> errors = new ArrayList<>(); + + public List<JSONObject> getMessages() { + return messages; + } + + public List<MetronError> getErrors() { + return errors; + } + + public void addMessage(JSONObject message) { + this.messages.add(message); + } + + public void addError(MetronError error) { + this.errors.add(error); + } + + public void addErrors(List<MetronError> errors) { + this.errors.addAll(errors); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ParserRunnerResults parserResult = (ParserRunnerResults) o; + return Objects.equals(messages, parserResult.getMessages()) && + Objects.equals(errors, parserResult.getErrors()); + } + + @Override + public int hashCode() { + int result = messages != null ? messages.hashCode() : 0; + result = 31 * result + (errors != null ? errors.hashCode() : 0); + return result; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java index a81149d..6bdfb81 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java @@ -29,7 +29,6 @@ import org.apache.hadoop.fs.Path; import org.apache.metron.common.Constants; import org.apache.metron.parsers.interfaces.MessageParser; import org.apache.metron.parsers.interfaces.MessageParserResult; -import org.apache.metron.parsers.interfaces.MultilineMessageParser; import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +53,7 @@ import java.util.Optional; import java.util.TimeZone; -public class GrokParser implements MultilineMessageParser<JSONObject>, Serializable { +public class GrokParser implements MessageParser<JSONObject>, Serializable { protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java new file mode 100644 index 0000000..f9123b1 --- /dev/null +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunner.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.parsers; + +import org.apache.metron.common.configuration.ParserConfigurations; +import org.apache.metron.common.message.metadata.RawMessage; +import org.apache.metron.parsers.interfaces.MessageParserResult; +import org.apache.metron.stellar.dsl.Context; + +import java.util.List; +import java.util.Set; +import java.util.function.Supplier; + +/** + * A ParserRunner is responsible for initializing MessageParsers and parsing messages with the appropriate MessageParser. + * The information needed to initialize a MessageParser is supplied by the parser config supplier. After the parsers + * are initialized, the execute method can then be called for each message and will return a ParserRunnerResults object + * that contains a list of parsed messages and/or a list of errors. + * @param <T> The type of a successfully parsed message. + */ +public interface ParserRunner<T> { + + /** + * Return a list of all sensor types that can be parsed with this ParserRunner. + * @return Sensor types + */ + Set<String> getSensorTypes(); + + /** + * + * @param parserConfigSupplier Supplies parser configurations + * @param stellarContext Stellar context used to apply Stellar functions during field transformations + */ + void init(Supplier<ParserConfigurations> parserConfigSupplier, Context stellarContext); + + /** + * Parses a message and either returns the message or an error. + * @param sensorType Sensor type of the message + * @param rawMessage Raw message including metadata + * @param parserConfigurations Parser configurations + * @return ParserRunnerResults containing a list of messages and a list of errors + */ + ParserRunnerResults<T> execute(String sensorType, RawMessage rawMessage, ParserConfigurations parserConfigurations); + +} http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java new file mode 100644 index 0000000..a986db7 --- /dev/null +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerImpl.java @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.parsers; + +import org.apache.commons.lang3.StringUtils; +import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.FieldTransformer; +import org.apache.metron.common.configuration.FieldValidator; +import org.apache.metron.common.configuration.ParserConfigurations; +import org.apache.metron.common.configuration.SensorParserConfig; +import org.apache.metron.common.error.MetronError; +import org.apache.metron.common.message.metadata.RawMessage; +import org.apache.metron.common.utils.ReflectionUtils; +import org.apache.metron.parsers.filters.Filters; +import org.apache.metron.parsers.interfaces.MessageFilter; +import org.apache.metron.parsers.interfaces.MessageParser; +import org.apache.metron.parsers.interfaces.MessageParserResult; +import org.apache.metron.parsers.topology.ParserComponent; +import org.apache.metron.stellar.dsl.Context; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * The default implemention of a ParserRunner. + */ +public class ParserRunnerImpl implements ParserRunner<JSONObject>, Serializable { + + class ProcessResult { + + private JSONObject message; + private MetronError error; + + public ProcessResult(JSONObject message) { + this.message = message; + } + + public ProcessResult(MetronError error) { + this.error = error; + } + + public JSONObject getMessage() { + return message; + } + + public MetronError getError() { + return error; + } + + public boolean isError() { + return error != null; + } + } + + protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + protected transient Consumer<ParserRunnerResults> onSuccess; + protected transient Consumer<MetronError> onError; + + private HashSet<String> sensorTypes; + private Map<String, ParserComponent> sensorToParserComponentMap; + + // Stellar variables + private transient Context stellarContext; + + public ParserRunnerImpl(HashSet<String> sensorTypes) { + this.sensorTypes = sensorTypes; + } + + public Map<String, ParserComponent> getSensorToParserComponentMap() { + return sensorToParserComponentMap; + } + + public void setSensorToParserComponentMap(Map<String, ParserComponent> sensorToParserComponentMap) { + this.sensorToParserComponentMap = sensorToParserComponentMap; + } + + public Context getStellarContext() { + return stellarContext; + } + + @Override + public Set<String> getSensorTypes() { + return sensorTypes; + } + + @Override + public void init(Supplier<ParserConfigurations> parserConfigSupplier, Context stellarContext) { + if (parserConfigSupplier == null) { + throw new IllegalStateException("A parser config supplier must be set before initializing the ParserRunner."); + } + if (stellarContext == null) { + throw new IllegalStateException("A stellar context must be set before initializing the ParserRunner."); + } + this.stellarContext = stellarContext; + initializeParsers(parserConfigSupplier); + } + + /** + * Parses messages with the appropriate MessageParser based on sensor type. The resulting list of messages are then + * post-processed and added to the ParserRunnerResults message list. Any errors that happen during post-processing are + * added to the ParserRunnerResults error list. Any exceptions (including a master exception) thrown by the MessageParser + * are also added to the ParserRunnerResults error list. + * + * @param sensorType Sensor type of the message + * @param rawMessage Raw message including metadata + * @param parserConfigurations Parser configurations + * @return ParserRunnerResults containing a list of messages and a list of errors + */ + @Override + public ParserRunnerResults<JSONObject> execute(String sensorType, RawMessage rawMessage, ParserConfigurations parserConfigurations) { + DefaultParserRunnerResults parserRunnerResults = new DefaultParserRunnerResults(); + SensorParserConfig sensorParserConfig = parserConfigurations.getSensorParserConfig(sensorType); + if (sensorParserConfig != null) { + MessageParser<JSONObject> parser = sensorToParserComponentMap.get(sensorType).getMessageParser(); + Optional<MessageParserResult<JSONObject>> optionalMessageParserResult = parser.parseOptionalResult(rawMessage.getMessage()); + if (optionalMessageParserResult.isPresent()) { + MessageParserResult<JSONObject> messageParserResult = optionalMessageParserResult.get(); + + // Process each message returned from the MessageParser + messageParserResult.getMessages().forEach(message -> { + Optional<ProcessResult> processResult = processMessage(sensorType, message, rawMessage, parser, parserConfigurations); + if (processResult.isPresent()) { + if (processResult.get().isError()) { + parserRunnerResults.addError(processResult.get().getError()); + } else { + parserRunnerResults.addMessage(processResult.get().getMessage()); + } + } + }); + + // If a master exception is thrown by the MessageParser, wrap it with a MetronError and add it to the list of errors + messageParserResult.getMasterThrowable().ifPresent(throwable -> parserRunnerResults.addError(new MetronError() + .withErrorType(Constants.ErrorType.PARSER_ERROR) + .withThrowable(throwable) + .withSensorType(Collections.singleton(sensorType)) + .addRawMessage(rawMessage.getMessage()))); + + // If exceptions are thrown by the MessageParser, wrap them with MetronErrors and add them to the list of errors + parserRunnerResults.addErrors(messageParserResult.getMessageThrowables().entrySet().stream().map(entry -> new MetronError() + .withErrorType(Constants.ErrorType.PARSER_ERROR) + .withThrowable(entry.getValue()) + .withSensorType(Collections.singleton(sensorType)) + .addRawMessage(entry.getKey())).collect(Collectors.toList())); + } + } else { + throw new IllegalStateException(String.format("Could not execute parser. Cannot find configuration for sensor %s.", + sensorType)); + } + return parserRunnerResults; + } + + /** + * Initializes MessageParsers and MessageFilters for sensor types configured in this ParserRunner. Objects are created + * using reflection and the MessageParser configure and init methods are called. + * @param parserConfigSupplier Parser configurations + */ + private void initializeParsers(Supplier<ParserConfigurations> parserConfigSupplier) { + LOG.info("Initializing parsers..."); + sensorToParserComponentMap = new HashMap<>(); + for(String sensorType: sensorTypes) { + if (parserConfigSupplier.get().getSensorParserConfig(sensorType) == null) { + throw new IllegalStateException(String.format("Could not initialize parsers. Cannot find configuration for sensor %s.", + sensorType)); + } + + SensorParserConfig parserConfig = parserConfigSupplier.get().getSensorParserConfig(sensorType); + + LOG.info("Creating parser for sensor {} with parser class = {} and filter class = {} ", + sensorType, parserConfig.getParserClassName(), parserConfig.getFilterClassName()); + + // create message parser + MessageParser<JSONObject> parser = ReflectionUtils + .createInstance(parserConfig.getParserClassName()); + + // create message filter + MessageFilter<JSONObject> filter = null; + parserConfig.getParserConfig().putIfAbsent("stellarContext", stellarContext); + if (!StringUtils.isEmpty(parserConfig.getFilterClassName())) { + filter = Filters.get( + parserConfig.getFilterClassName(), + parserConfig.getParserConfig() + ); + } + + parser.configure(parserConfig.getParserConfig()); + parser.init(); + sensorToParserComponentMap.put(sensorType, new ParserComponent(parser, filter)); + } + } + + /** + * Post-processes parsed messages by: + * <ul> + * <li>Applying field transformations defined in the sensor parser config</li> + * <li>Filtering messages using the configured MessageFilter class</li> + * <li>Validating messages using the MessageParser validate method</li> + * </ul> + * If a message is successfully processed a message is returned in a ProcessResult. If a message fails + * validation, a MetronError object is created and returned in a ProcessResult. If a message is + * filtered out an empty Optional is returned. + * + * @param sensorType Sensor type of the message + * @param message Message parsed by the MessageParser + * @param rawMessage Raw message including metadata + * @param parser MessageParser for the sensor type + * @param parserConfigurations Parser configurations + */ + @SuppressWarnings("unchecked") + protected Optional<ProcessResult> processMessage(String sensorType, JSONObject message, RawMessage rawMessage, + MessageParser<JSONObject> parser, + ParserConfigurations parserConfigurations + ) { + Optional<ProcessResult> processResult = Optional.empty(); + SensorParserConfig sensorParserConfig = parserConfigurations.getSensorParserConfig(sensorType); + sensorParserConfig.getRawMessageStrategy().mergeMetadata( + message, + rawMessage.getMetadata(), + sensorParserConfig.getMergeMetadata(), + sensorParserConfig.getRawMessageStrategyConfig() + ); + message.put(Constants.SENSOR_TYPE, sensorType); + applyFieldTransformations(message, rawMessage, sensorParserConfig); + if (!message.containsKey(Constants.GUID)) { + message.put(Constants.GUID, UUID.randomUUID().toString()); + } + MessageFilter<JSONObject> filter = sensorToParserComponentMap.get(sensorType).getFilter(); + if (filter == null || filter.emit(message, stellarContext)) { + boolean isInvalid = !parser.validate(message); + List<FieldValidator> failedValidators = null; + if (!isInvalid) { + failedValidators = getFailedValidators(message, parserConfigurations); + isInvalid = !failedValidators.isEmpty(); + } + if (isInvalid) { + MetronError error = new MetronError() + .withErrorType(Constants.ErrorType.PARSER_INVALID) + .withSensorType(Collections.singleton(sensorType)) + .addRawMessage(message); + Set<String> errorFields = failedValidators == null ? null : failedValidators.stream() + .flatMap(fieldValidator -> fieldValidator.getInput().stream()) + .collect(Collectors.toSet()); + if (errorFields != null && !errorFields.isEmpty()) { + error.withErrorFields(errorFields); + } + processResult = Optional.of(new ProcessResult(error)); + } else { + processResult = Optional.of(new ProcessResult(message)); + } + } + return processResult; + } + + /** + * Applies Stellar field transformations defined in the sensor parser config. + * @param message Message parsed by the MessageParser + * @param rawMessage Raw message including metadata + * @param sensorParserConfig Sensor parser config + */ + private void applyFieldTransformations(JSONObject message, RawMessage rawMessage, SensorParserConfig sensorParserConfig) { + for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) { + if (handler != null) { + if (!sensorParserConfig.getMergeMetadata()) { + //if we haven't merged metadata, then we need to pass them along as configuration params. + handler.transformAndUpdate( + message, + stellarContext, + sensorParserConfig.getParserConfig(), + rawMessage.getMetadata() + ); + } else { + handler.transformAndUpdate( + message, + stellarContext, + sensorParserConfig.getParserConfig() + ); + } + } + } + } + + private List<FieldValidator> getFailedValidators(JSONObject message, ParserConfigurations parserConfigurations) { + List<FieldValidator> fieldValidations = parserConfigurations.getFieldValidations(); + List<FieldValidator> failedValidators = new ArrayList<>(); + for(FieldValidator validator : fieldValidations) { + if(!validator.isValid(message, parserConfigurations.getGlobalConfig(), stellarContext)) { + failedValidators.add(validator); + } + } + return failedValidators; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java new file mode 100644 index 0000000..7ca853c --- /dev/null +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/ParserRunnerResults.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.parsers; + +import org.apache.metron.common.error.MetronError; + +import java.util.List; + +/** + * Container for the results of parsing a message with a ParserRunner. + * @param <T> The type of a successfully parsed message. + */ +public interface ParserRunnerResults<T> { + + List<T> getMessages(); + + List<MetronError> getErrors(); +} http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java index 05334c2..a9ee305 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java @@ -18,13 +18,18 @@ package org.apache.metron.parsers.bolt; +import java.io.Serializable; +import java.lang.invoke.MethodHandles; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.function.Function; import com.github.benmanes.caffeine.cache.Cache; -import org.apache.commons.lang3.StringUtils; import org.apache.metron.common.Constants; import org.apache.metron.common.bolt.ConfiguredParserBolt; -import org.apache.metron.common.configuration.FieldTransformer; -import org.apache.metron.common.configuration.FieldValidator; +import org.apache.metron.common.configuration.ParserConfigurations; import org.apache.metron.common.configuration.SensorParserConfig; import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.error.MetronError; @@ -33,12 +38,8 @@ import org.apache.metron.common.message.MessageGetters; import org.apache.metron.common.message.metadata.RawMessage; import org.apache.metron.common.message.metadata.RawMessageUtil; import org.apache.metron.common.utils.ErrorUtils; -import org.apache.metron.parsers.filters.Filters; -import org.apache.metron.parsers.interfaces.MessageFilter; -import org.apache.metron.parsers.interfaces.MessageParser; -import org.apache.metron.parsers.interfaces.MessageParserResult; -import org.apache.metron.parsers.interfaces.MultilineMessageParser; -import org.apache.metron.parsers.topology.ParserComponents; +import org.apache.metron.parsers.ParserRunner; +import org.apache.metron.parsers.ParserRunnerResults; import org.apache.metron.stellar.common.CachingStellarProcessor; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.StellarFunctions; @@ -56,45 +57,32 @@ import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; -import java.lang.invoke.MethodHandles; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.function.Function; -import java.util.stream.Collectors; - public class ParserBolt extends ConfiguredParserBolt implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private OutputCollector collector; - private Map<String, ParserComponents> sensorToComponentMap; + private ParserRunner<JSONObject> parserRunner; + private Map<String, WriterHandler> sensorToWriterMap; private Map<String, String> topicToSensorMap = new HashMap<>(); - private Context stellarContext; private transient MessageGetStrategy messageGetStrategy; - private transient Cache<CachingStellarProcessor.Key, Object> cache; private int requestedTickFreqSecs; private int defaultBatchTimeout; private int batchTimeoutDivisor = 1; public ParserBolt( String zookeeperUrl - , Map<String, ParserComponents> sensorToComponentMap + , ParserRunner parserRunner + , Map<String, WriterHandler> sensorToWriterMap ) { super(zookeeperUrl); - this.sensorToComponentMap = sensorToComponentMap; + this.parserRunner = parserRunner; + this.sensorToWriterMap = sensorToWriterMap; // Ensure that all sensors are either bulk sensors or not bulk sensors. Can't mix and match. Boolean handleAcks = null; - for (Map.Entry<String, ParserComponents> entry : sensorToComponentMap.entrySet()) { - boolean writerHandleAck = entry.getValue().getWriter().handleAck(); + for (Map.Entry<String, WriterHandler> entry : sensorToWriterMap.entrySet()) { + boolean writerHandleAck = entry.getValue().handleAck(); if (handleAcks == null) { handleAcks = writerHandleAck; } else if (!handleAcks.equals(writerHandleAck)) { @@ -130,21 +118,44 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { /** * Used only for unit testing - * @param defaultBatchTimeout */ - protected void setDefaultBatchTimeout(int defaultBatchTimeout) { - this.defaultBatchTimeout = defaultBatchTimeout; + public int getBatchTimeoutDivisor() { + return batchTimeoutDivisor; } /** * Used only for unit testing */ - public int getDefaultBatchTimeout() { - return defaultBatchTimeout; + protected void setSensorToWriterMap(Map<String, WriterHandler> sensorToWriterMap) { + this.sensorToWriterMap = sensorToWriterMap; } - public Map<String, ParserComponents> getSensorToComponentMap() { - return sensorToComponentMap; + /** + * Used only for unit testing + */ + protected Map<String, String> getTopicToSensorMap() { + return topicToSensorMap; + } + + /** + * Used only for unit testing + */ + protected void setTopicToSensorMap(Map<String, String> topicToSensorMap) { + this.topicToSensorMap = topicToSensorMap; + } + + /** + * Used only for unit testing + */ + public void setMessageGetStrategy(MessageGetStrategy messageGetStrategy) { + this.messageGetStrategy = messageGetStrategy; + } + + /** + * Used only for unit testing + */ + public void setOutputCollector(OutputCollector collector) { + this.collector = collector; } /** @@ -159,7 +170,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { // to get the valid WriterConfiguration. But don't store any non-serializable objects, // else Storm will throw a runtime error. Function<WriterConfiguration, WriterConfiguration> configurationXform; - WriterHandler writer = sensorToComponentMap.entrySet().iterator().next().getValue().getWriter(); + WriterHandler writer = sensorToWriterMap.entrySet().iterator().next().getValue(); if (writer.isWriterToBulkWriter()) { configurationXform = WriterToBulkWriter.TRANSFORMATION; } else { @@ -189,37 +200,11 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { super.prepare(stormConf, context, collector); messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get(); this.collector = collector; - - // Build the Stellar cache - Map<String, Object> cacheConfig = new HashMap<>(); - for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) { - String sensor = entry.getKey(); - SensorParserConfig config = getSensorParserConfig(sensor); - - if (config != null) { - cacheConfig.putAll(config.getCacheConfig()); - } - } - cache = CachingStellarProcessor.createCache(cacheConfig); + this.parserRunner.init(this::getConfigurations, initializeStellar()); // Need to prep all sensors - for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) { + for (Map.Entry<String, WriterHandler> entry: sensorToWriterMap.entrySet()) { String sensor = entry.getKey(); - MessageParser<JSONObject> parser = entry.getValue().getMessageParser(); - - initializeStellar(); - if (getSensorParserConfig(sensor) != null && sensorToComponentMap.get(sensor).getFilter() == null) { - getSensorParserConfig(sensor).getParserConfig().putIfAbsent("stellarContext", stellarContext); - if (!StringUtils.isEmpty(getSensorParserConfig(sensor).getFilterClassName())) { - MessageFilter<JSONObject> filter = Filters.get( - getSensorParserConfig(sensor).getFilterClassName(), - getSensorParserConfig(sensor).getParserConfig() - ); - getSensorToComponentMap().get(sensor).setFilter(filter); - } - } - - parser.init(); SensorParserConfig config = getSensorParserConfig(sensor); if (config != null) { @@ -229,9 +214,8 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { throw new IllegalStateException( "Unable to retrieve a parser config for " + sensor); } - parser.configure(config.getParserConfig()); - WriterHandler writer = sensorToComponentMap.get(sensor).getWriter(); + WriterHandler writer = sensorToWriterMap.get(sensor); writer.init(stormConf, context, collector, getConfigurations()); if (defaultBatchTimeout == 0) { //This means getComponentConfiguration was never called to initialize defaultBatchTimeout, @@ -246,225 +230,106 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { } } - protected void initializeStellar() { - Context.Builder builder = new Context.Builder() - .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client) - .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig()) - .with(Context.Capabilities.STELLAR_CONFIG, () -> getConfigurations().getGlobalConfig()) - ; - if(cache != null) { - builder = builder.with(Context.Capabilities.CACHE, () -> cache); - } - this.stellarContext = builder.build(); - StellarFunctions.initialize(stellarContext); - } - @SuppressWarnings("unchecked") @Override public void execute(Tuple tuple) { if (TupleUtils.isTick(tuple)) { - try { - for (Entry<String, ParserComponents> entry : sensorToComponentMap.entrySet()) { - entry.getValue().getWriter().flush(getConfigurations(), messageGetStrategy); - } - } catch (Exception e) { - throw new RuntimeException( - "This should have been caught in the writerHandler. If you see this, file a JIRA", e); - } finally { - collector.ack(tuple); - } + handleTickTuple(tuple); return; } - byte[] originalMessage = (byte[]) messageGetStrategy.get(tuple); + String topic = tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName()); + String sensorType = topicToSensorMap.get(topic); try { - SensorParserConfig sensorParserConfig; - MessageParser<JSONObject> parser; - String sensor; - Map<String, Object> metadata; - if (sensorToComponentMap.size() == 1) { - // There's only one parser, so grab info directly - Entry<String, ParserComponents> sensorParser = sensorToComponentMap.entrySet().iterator() - .next(); - sensor = sensorParser.getKey(); - parser = sensorParser.getValue().getMessageParser(); - sensorParserConfig = getSensorParserConfig(sensor); - } else { - // There's multiple parsers, so pull the topic from the Tuple and look up the sensor - String topic = tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName()); - sensor = topicToSensorMap.get(topic); - parser = sensorToComponentMap.get(sensor).getMessageParser(); - sensorParserConfig = getSensorParserConfig(sensor); - } + ParserConfigurations parserConfigurations = getConfigurations(); + SensorParserConfig sensorParserConfig = parserConfigurations.getSensorParserConfig(sensorType); + RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage( sensorParserConfig.getRawMessageStrategy() + , tuple + , originalMessage + , sensorParserConfig.getReadMetadata() + , sensorParserConfig.getRawMessageStrategyConfig() + ); + ParserRunnerResults<JSONObject> parserRunnerResults = parserRunner.execute(sensorType, rawMessage, parserConfigurations); + long numWritten = parserRunnerResults.getMessages().stream() + .map(message -> handleMessage(sensorType, originalMessage, tuple, message, collector)) + .filter(result -> result) + .count(); + parserRunnerResults.getErrors().forEach(error -> ErrorUtils.handleError(collector, error)); - List<FieldValidator> fieldValidations = getConfigurations().getFieldValidations(); - boolean ackTuple = false; - int numWritten = 0; - if (sensorParserConfig != null) { - RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage( sensorParserConfig.getRawMessageStrategy() - , tuple - , originalMessage - , sensorParserConfig.getReadMetadata() - , sensorParserConfig.getRawMessageStrategyConfig() - ); - - metadata = rawMessage.getMetadata(); - - MultilineMessageParser mmp = null; - if (!(parser instanceof MultilineMessageParser)) { - mmp = new MultilineMessageParser<JSONObject>() { - - @Override - public void configure(Map<String, Object> config) { - parser.configure(config); - } - - @Override - public void init() { - parser.init(); - } - - @Override - public boolean validate(JSONObject message) { - return parser.validate(message); - } - - @Override - public List<JSONObject> parse(byte[] message) { - return parser.parse(message); - } - - @Override - public Optional<List<JSONObject>> parseOptional(byte[] message) { - return parser.parseOptional(message); - } - }; - } else { - mmp = (MultilineMessageParser) parser; - } - - Optional<MessageParserResult<JSONObject>> results = mmp.parseOptionalResult(rawMessage.getMessage()); - - // check if there is a master error - if (results.isPresent() && results.get().getMasterThrowable().isPresent()) { - handleError(originalMessage, tuple, results.get().getMasterThrowable().get(), collector); - return; - } - - // Handle the message results - List<JSONObject> messages = results.isPresent() ? results.get().getMessages() : Collections.EMPTY_LIST; - for (JSONObject message : messages) { - //we want to ack the tuple in the situation where we have are not doing a bulk write - //otherwise we want to defer to the writerComponent who will ack on bulk commit. - WriterHandler writer = sensorToComponentMap.get(sensor).getWriter(); - ackTuple = !writer.handleAck(); - - sensorParserConfig.getRawMessageStrategy().mergeMetadata( - message, - metadata, - sensorParserConfig.getMergeMetadata(), - sensorParserConfig.getRawMessageStrategyConfig() - ); - message.put(Constants.SENSOR_TYPE, sensor); - - for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) { - if (handler != null) { - if (!sensorParserConfig.getMergeMetadata()) { - //if we haven't merged metadata, then we need to pass them along as configuration params. - handler.transformAndUpdate( - message, - stellarContext, - sensorParserConfig.getParserConfig(), - metadata - ); - } else { - handler.transformAndUpdate( - message, - stellarContext, - sensorParserConfig.getParserConfig() - ); - } - } - } - if (!message.containsKey(Constants.GUID)) { - message.put(Constants.GUID, UUID.randomUUID().toString()); - } - - MessageFilter<JSONObject> filter = sensorToComponentMap.get(sensor).getFilter(); - if (filter == null || filter.emitTuple(message, stellarContext)) { - boolean isInvalid = !parser.validate(message); - List<FieldValidator> failedValidators = null; - if (!isInvalid) { - failedValidators = getFailedValidators(message, fieldValidations); - isInvalid = !failedValidators.isEmpty(); - } - if (isInvalid) { - MetronError error = new MetronError() - .withErrorType(Constants.ErrorType.PARSER_INVALID) - .withSensorType(Collections.singleton(sensor)) - .addRawMessage(message); - Set<String> errorFields = failedValidators == null ? null : failedValidators.stream() - .flatMap(fieldValidator -> fieldValidator.getInput().stream()) - .collect(Collectors.toSet()); - if (errorFields != null && !errorFields.isEmpty()) { - error.withErrorFields(errorFields); - } - ErrorUtils.handleError(collector, error); - } else { - numWritten++; - writer.write(sensor, tuple, message, getConfigurations(), messageGetStrategy); - } - } - } - - // Handle the error results - Map<Object, Throwable> messageErrors = results.isPresent() - ? results.get().getMessageThrowables() : Collections.EMPTY_MAP; - - for (Entry<Object,Throwable> entry : messageErrors.entrySet()) { - MetronError error = new MetronError() - .withErrorType(Constants.ErrorType.PARSER_ERROR) - .withThrowable(entry.getValue()) - .withSensorType(sensorToComponentMap.keySet()) - .addRawMessage(originalMessage); - ErrorUtils.handleError(collector, error); - } - } //if we are supposed to ack the tuple OR if we've never passed this tuple to the bulk writer //(meaning that none of the messages are valid either globally or locally) //then we want to handle the ack ourselves. - if (ackTuple || numWritten == 0) { + if (!sensorToWriterMap.get(sensorType).handleAck() || numWritten == 0) { collector.ack(tuple); } } catch (Throwable ex) { - handleError(originalMessage, tuple, ex, collector); + handleError(sensorType, originalMessage, tuple, ex, collector); + collector.ack(tuple); } } - protected void handleError(byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) { + protected Context initializeStellar() { + Map<String, Object> cacheConfig = new HashMap<>(); + for (String sensorType: this.parserRunner.getSensorTypes()) { + SensorParserConfig config = getSensorParserConfig(sensorType); + + if (config != null) { + cacheConfig.putAll(config.getCacheConfig()); + } + } + Cache<CachingStellarProcessor.Key, Object> cache = CachingStellarProcessor.createCache(cacheConfig); + + Context.Builder builder = new Context.Builder() + .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client) + .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig()) + .with(Context.Capabilities.STELLAR_CONFIG, () -> getConfigurations().getGlobalConfig()) + ; + if(cache != null) { + builder = builder.with(Context.Capabilities.CACHE, () -> cache); + } + Context stellarContext = builder.build(); + StellarFunctions.initialize(stellarContext); + return stellarContext; + } + + protected void handleTickTuple(Tuple tuple) { + try { + for (Entry<String, WriterHandler> entry : sensorToWriterMap.entrySet()) { + entry.getValue().flush(getConfigurations(), messageGetStrategy); + } + } catch (Exception e) { + throw new RuntimeException( + "This should have been caught in the writerHandler. If you see this, file a JIRA", e); + } finally { + collector.ack(tuple); + } + } + + protected boolean handleMessage(String sensorType, byte[] originalMessage, Tuple tuple, JSONObject message, OutputCollector collector) { + WriterHandler writer = sensorToWriterMap.get(sensorType); + try { + writer.write(sensorType, tuple, message, getConfigurations(), messageGetStrategy); + return true; + } catch (Exception ex) { + handleError(sensorType, originalMessage, tuple, ex, collector); + return false; + } + } + + protected void handleError(String sensorType, byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) { MetronError error = new MetronError() .withErrorType(Constants.ErrorType.PARSER_ERROR) .withThrowable(ex) - .withSensorType(sensorToComponentMap.keySet()) + .withSensorType(Collections.singleton(sensorType)) .addRawMessage(originalMessage); ErrorUtils.handleError(collector, error); - collector.ack(tuple); - } - - private List<FieldValidator> getFailedValidators(JSONObject input, List<FieldValidator> validators) { - List<FieldValidator> failedValidators = new ArrayList<>(); - for(FieldValidator validator : validators) { - if(!validator.isValid(input, getConfigurations().getGlobalConfig(), stellarContext)) { - failedValidators.add(validator); - } - } - return failedValidators; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(Constants.ERROR_STREAM, new Fields("message")); } + } http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java index 9cdafa3..1fa1feb 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java @@ -67,7 +67,7 @@ public class BroMessageFilter implements MessageFilter<JSONObject>{ */ @Override - public boolean emitTuple(JSONObject message, Context context) { + public boolean emit(JSONObject message, Context context) { String protocol = (String) message.get(_key); return _known_protocols.contains(protocol); } http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java index 15a035a..8300ff4 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java @@ -54,7 +54,7 @@ public class StellarFilter implements MessageFilter<JSONObject> { } @Override - public boolean emitTuple(JSONObject message, Context context) { + public boolean emit(JSONObject message, Context context) { VariableResolver resolver = new MapVariableResolver(message); return processor.parse(query, resolver, functionResolver, context); } http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java index b7b91c0..207c070 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java @@ -21,5 +21,5 @@ import org.apache.metron.stellar.dsl.Context; public interface MessageFilter<T> extends Configurable{ - boolean emitTuple(T message, Context context); + boolean emit(T message, Context context); } http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java index 665076b..c9f8351 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java @@ -17,6 +17,7 @@ */ package org.apache.metron.parsers.interfaces; +import org.apache.commons.lang3.NotImplementedException; import org.apache.metron.parsers.DefaultMessageParserResult; import java.io.Serializable; @@ -38,18 +39,42 @@ public interface MessageParser<T> extends Configurable { * @param rawMessage the raw bytes of the message * @return If null is returned, this is treated as an empty list. */ - List<T> parse(byte[] rawMessage); + @Deprecated + default List<T> parse(byte[] rawMessage) { + throw new NotImplementedException("parse is not implemented"); + } /** * Take raw data and convert it to an optional list of messages. * @param parseMessage the raw bytes of the message * @return If null is returned, this is treated as an empty list. */ + @Deprecated default Optional<List<T>> parseOptional(byte[] parseMessage) { return Optional.ofNullable(parse(parseMessage)); } /** + * Take raw data and convert it to messages. Each raw message may produce multiple messages and therefore + * multiple errors. A {@link MessageParserResult} is returned, which will have both the messages produced + * and the errors. + * @param parseMessage the raw bytes of the message + * @return Optional of {@link MessageParserResult} + */ + default Optional<MessageParserResult<T>> parseOptionalResult(byte[] parseMessage) { + Optional<MessageParserResult<T>> result = Optional.empty(); + try { + Optional<List<T>> optionalMessages = parseOptional(parseMessage); + if (optionalMessages.isPresent()) { + result = Optional.of(new DefaultMessageParserResult<>(optionalMessages.get())); + } + } catch (Throwable t) { + return Optional.of(new DefaultMessageParserResult<>(t)); + } + return result; + } + + /** * Validate the message to ensure that it's correct. * @param message the message to validate * @return true if the message is valid, false if not http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java deleted file mode 100644 index 7818f9a..0000000 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.parsers.interfaces; - -import org.apache.commons.lang3.NotImplementedException; -import org.apache.metron.parsers.DefaultMessageParserResult; - -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; - -public interface MultilineMessageParser<T> extends MessageParser<T> { - - default List<T> parse(byte[] rawMessage) { - throw new NotImplementedException("parse is not implemented"); - } - - /** - * Take raw data and convert it to messages. Each raw message may produce multiple messages and therefore - * multiple errors. A {@link MessageParserResult} is returned, which will have both the messages produced - * and the errors. - * @param parseMessage the raw bytes of the message - * @return Optional of {@link MessageParserResult} - */ - default Optional<MessageParserResult<T>> parseOptionalResult(byte[] parseMessage) { - List<T> list = new ArrayList<>(); - try { - Optional<List<T>> optionalMessages = parseOptional(parseMessage); - optionalMessages.ifPresent(list::addAll); - } catch (Throwable t) { - return Optional.of(new DefaultMessageParserResult<>(t)); - } - return Optional.of(new DefaultMessageParserResult<T>(list)); - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java index 79a082a..5b62e85 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java @@ -26,8 +26,8 @@ import com.github.palindromicity.syslog.dsl.SyslogFieldKeys; import org.apache.commons.lang3.StringUtils; import org.apache.metron.parsers.BasicParser; import org.apache.metron.parsers.DefaultMessageParserResult; +import org.apache.metron.parsers.interfaces.MessageParser; import org.apache.metron.parsers.interfaces.MessageParserResult; -import org.apache.metron.parsers.interfaces.MultilineMessageParser; import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +52,7 @@ import java.util.Optional; /** * Parser for well structured RFC 5424 messages. */ -public class Syslog5424Parser implements MultilineMessageParser<JSONObject>, Serializable { +public class Syslog5424Parser implements MessageParser<JSONObject>, Serializable { protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final String NIL_POLICY_CONFIG = "nilPolicy"; private transient SyslogParser syslogParser; http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java new file mode 100644 index 0000000..eb5ff9f --- /dev/null +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.parsers.topology; + +import org.apache.metron.parsers.interfaces.MessageFilter; +import org.apache.metron.parsers.interfaces.MessageParser; +import org.json.simple.JSONObject; + +import java.io.Serializable; + +public class ParserComponent implements Serializable { + private static final long serialVersionUID = 7880346740026374665L; + + private MessageParser<JSONObject> messageParser; + private MessageFilter<JSONObject> filter; + + public ParserComponent( + MessageParser<JSONObject> messageParser, + MessageFilter<JSONObject> filter) { + this.messageParser = messageParser; + this.filter = filter; + } + + public MessageParser<JSONObject> getMessageParser() { + return messageParser; + } + + public MessageFilter<JSONObject> getFilter() { + return filter; + } + + public void setMessageParser( + MessageParser<JSONObject> messageParser) { + this.messageParser = messageParser; + } + + public void setFilter( + MessageFilter<JSONObject> filter) { + this.filter = filter; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java deleted file mode 100644 index 32d56b9..0000000 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.metron.parsers.topology; - -import java.io.Serializable; -import org.apache.metron.parsers.bolt.WriterHandler; -import org.apache.metron.parsers.interfaces.MessageFilter; -import org.apache.metron.parsers.interfaces.MessageParser; -import org.json.simple.JSONObject; - -public class ParserComponents implements Serializable { - private static final long serialVersionUID = 7880346740026374665L; - - private MessageParser<JSONObject> messageParser; - private MessageFilter<JSONObject> filter; - private WriterHandler writer; - - public ParserComponents( - MessageParser<JSONObject> messageParser, - MessageFilter<JSONObject> filter, - WriterHandler writer) { - this.messageParser = messageParser; - this.filter = filter; - this.writer = writer; - } - - public MessageParser<JSONObject> getMessageParser() { - return messageParser; - } - - public MessageFilter<JSONObject> getFilter() { - return filter; - } - - public WriterHandler getWriter() { - return writer; - } - - public void setMessageParser( - MessageParser<JSONObject> messageParser) { - this.messageParser = messageParser; - } - - public void setFilter( - MessageFilter<JSONObject> filter) { - this.filter = filter; - } - - public void setWriter(WriterHandler writer) { - this.writer = writer; - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java index d20e1a5..9dc7b88 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,11 +21,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.metron.common.Constants; @@ -37,12 +37,10 @@ import org.apache.metron.common.utils.KafkaUtils; import org.apache.metron.common.utils.ReflectionUtils; import org.apache.metron.common.writer.BulkMessageWriter; import org.apache.metron.common.writer.MessageWriter; +import org.apache.metron.parsers.ParserRunnerImpl; import org.apache.metron.parsers.bolt.ParserBolt; import org.apache.metron.parsers.bolt.WriterBolt; import org.apache.metron.parsers.bolt.WriterHandler; -import org.apache.metron.parsers.filters.Filters; -import org.apache.metron.parsers.interfaces.MessageFilter; -import org.apache.metron.parsers.interfaces.MessageParser; import org.apache.metron.parsers.topology.config.ValueSupplier; import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder; import org.apache.metron.storm.kafka.flux.SpoutConfiguration; @@ -268,29 +266,14 @@ public class ParserTopologyBuilder { Optional<String> securityProtocol, ParserConfigurations configs, Optional<String> outputTopic) { - - Map<String, ParserComponents> parserBoltConfigs = new HashMap<>(); + Map<String, WriterHandler> writerConfigs = new HashMap<>(); for( Entry<String, SensorParserConfig> entry : sensorTypeToParserConfig.entrySet()) { String sensorType = entry.getKey(); SensorParserConfig parserConfig = entry.getValue(); - // create message parser - MessageParser<JSONObject> parser = ReflectionUtils - .createInstance(parserConfig.getParserClassName()); - parser.configure(parserConfig.getParserConfig()); - - // create message filter - MessageFilter<JSONObject> filter = null; - if (!StringUtils.isEmpty(parserConfig.getFilterClassName())) { - filter = Filters.get( - parserConfig.getFilterClassName(), - parserConfig.getParserConfig() - ); - } // create a writer AbstractWriter writer; if (parserConfig.getWriterClassName() == null) { - // if not configured, use a sensible default writer = createKafkaWriter(brokerUrl, zookeeperUrl, securityProtocol) .withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC)); @@ -304,16 +287,10 @@ public class ParserTopologyBuilder { // create a writer handler WriterHandler writerHandler = createWriterHandler(writer); - - ParserComponents components = new ParserComponents( - parser, - filter, - writerHandler - ); - parserBoltConfigs.put(sensorType, components); + writerConfigs.put(sensorType, writerHandler); } - return new ParserBolt(zookeeperUrl, parserBoltConfigs); + return new ParserBolt(zookeeperUrl, new ParserRunnerImpl(new HashSet<>(sensorTypeToParserConfig.keySet())), writerConfigs); } /** http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java index 8441409..2f3784a 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java @@ -42,8 +42,8 @@ public class FiltersTest { put("filter.query", "exists(foo)"); }}; MessageFilter<JSONObject> filter = Filters.get(Filters.STELLAR.name(), config); - Assert.assertTrue(filter.emitTuple(new JSONObject(ImmutableMap.of("foo", 1)), Context.EMPTY_CONTEXT())); - Assert.assertFalse(filter.emitTuple(new JSONObject(ImmutableMap.of("bar", 1)), Context.EMPTY_CONTEXT())); + Assert.assertTrue(filter.emit(new JSONObject(ImmutableMap.of("foo", 1)), Context.EMPTY_CONTEXT())); + Assert.assertFalse(filter.emit(new JSONObject(ImmutableMap.of("bar", 1)), Context.EMPTY_CONTEXT())); } } http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java index 9769baa..4842a1f 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java @@ -19,68 +19,118 @@ package org.apache.metron.parsers; import org.apache.metron.parsers.interfaces.MessageParser; +import org.apache.metron.parsers.interfaces.MessageParserResult; +import org.json.simple.JSONObject; import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; public class MessageParserTest { - @Test - public void testNullable() throws Exception { - MessageParser parser = new MessageParser() { - @Override - public void init() { - } + abstract class TestMessageParser implements MessageParser<JSONObject> { + @Override + public void init() { + } - @Override - public List parse(byte[] rawMessage) { - return null; - } + @Override + public boolean validate(JSONObject message) { + return false; + } - @Override - public boolean validate(Object message) { - return false; - } + @Override + public void configure(Map<String, Object> config) { - @Override - public void configure(Map<String, Object> config) { + } + } + @Test + public void testNullable() throws Exception { + MessageParser parser = new TestMessageParser() { + @Override + public List<JSONObject> parse(byte[] rawMessage) { + return null; } }; - Assert.assertNotNull(parser.parseOptional(null)); - Assert.assertFalse(parser.parseOptional(null).isPresent()); + Assert.assertNotNull(parser.parseOptionalResult(null)); + Assert.assertFalse(parser.parseOptionalResult(null).isPresent()); } @Test public void testNotNullable() throws Exception { - MessageParser parser = new MessageParser() { + MessageParser<JSONObject> parser = new TestMessageParser() { @Override - public void init() { - + public List<JSONObject> parse(byte[] rawMessage) { + return new ArrayList<>(); } + }; + Assert.assertNotNull(parser.parseOptionalResult(null)); + Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult(null); + Assert.assertTrue(ret.isPresent()); + Assert.assertEquals(0, ret.get().getMessages().size()); + } + @Test + public void testParse() { + JSONObject message = new JSONObject(); + MessageParser<JSONObject> parser = new TestMessageParser() { @Override - public List parse(byte[] rawMessage) { - return new ArrayList<>(); + public List<JSONObject> parse(byte[] rawMessage) { + return Collections.singletonList(message); } + }; + Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes()); + Assert.assertTrue(ret.isPresent()); + Assert.assertEquals(1, ret.get().getMessages().size()); + Assert.assertEquals(message, ret.get().getMessages().get(0)); + } + @Test + public void testParseOptional() { + JSONObject message = new JSONObject(); + MessageParser<JSONObject> parser = new TestMessageParser() { @Override - public boolean validate(Object message) { - return false; + public Optional<List<JSONObject>> parseOptional(byte[] rawMessage) { + return Optional.of(Collections.singletonList(message)); } + }; + Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes()); + Assert.assertTrue(ret.isPresent()); + Assert.assertEquals(1, ret.get().getMessages().size()); + Assert.assertEquals(message, ret.get().getMessages().get(0)); + } + @Test + public void testParseException() { + MessageParser<JSONObject> parser = new TestMessageParser() { @Override - public void configure(Map<String, Object> config) { + public List<JSONObject> parse(byte[] rawMessage) { + throw new RuntimeException("parse exception"); + } + }; + Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes()); + Assert.assertTrue(ret.isPresent()); + Assert.assertTrue(ret.get().getMasterThrowable().isPresent()); + Assert.assertEquals("parse exception", ret.get().getMasterThrowable().get().getMessage()); + } + @Test + public void testParseOptionalException() { + MessageParser<JSONObject> parser = new TestMessageParser() { + @Override + public Optional<List<JSONObject>> parseOptional(byte[] rawMessage) { + throw new RuntimeException("parse exception"); } }; - Assert.assertNotNull(parser.parseOptional(null)); - Optional<List> ret = parser.parseOptional(null); + Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes()); Assert.assertTrue(ret.isPresent()); - Assert.assertEquals(0, ret.get().size()); + Assert.assertTrue(ret.get().getMasterThrowable().isPresent()); + Assert.assertEquals("parse exception", ret.get().getMasterThrowable().get().getMessage()); } + }
