Github user merrimanr commented on a diff in the pull request:
https://github.com/apache/metron/pull/1213#discussion_r220992847
--- Diff:
metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
---
@@ -242,169 +226,81 @@ public void prepare(Map stormConf, TopologyContext
context, OutputCollector coll
}
}
- protected void initializeStellar() {
- Context.Builder builder = new Context.Builder()
-
.with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
- .with(Context.Capabilities.GLOBAL_CONFIG,
() -> getConfigurations().getGlobalConfig())
- .with(Context.Capabilities.STELLAR_CONFIG,
() -> getConfigurations().getGlobalConfig())
- ;
- if(cache != null) {
- builder = builder.with(Context.Capabilities.CACHE, () -> cache);
- }
- this.stellarContext = builder.build();
- StellarFunctions.initialize(stellarContext);
- }
-
@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
if (TupleUtils.isTick(tuple)) {
- try {
- for (Entry<String, ParserComponents> entry :
sensorToComponentMap.entrySet()) {
- entry.getValue().getWriter().flush(getConfigurations(),
messageGetStrategy);
- }
- } catch (Exception e) {
- throw new RuntimeException(
- "This should have been caught in the writerHandler. If you
see this, file a JIRA", e);
- } finally {
- collector.ack(tuple);
- }
+ handleTickTuple(tuple);
return;
}
-
+ numWritten = 0;
byte[] originalMessage = (byte[]) messageGetStrategy.get(tuple);
+ String topic =
tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName());
+ String sensorType = topicToSensorMap.get(topic);
try {
- SensorParserConfig sensorParserConfig;
- MessageParser<JSONObject> parser;
- String sensor;
- Map<String, Object> metadata;
- if (sensorToComponentMap.size() == 1) {
- // There's only one parser, so grab info directly
- Entry<String, ParserComponents> sensorParser =
sensorToComponentMap.entrySet().iterator()
- .next();
- sensor = sensorParser.getKey();
- parser = sensorParser.getValue().getMessageParser();
- sensorParserConfig = getSensorParserConfig(sensor);
- } else {
- // There's multiple parsers, so pull the topic from the Tuple and
look up the sensor
- String topic =
tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName());
- sensor = topicToSensorMap.get(topic);
- parser = sensorToComponentMap.get(sensor).getMessageParser();
- sensorParserConfig = getSensorParserConfig(sensor);
- }
+ ParserConfigurations parserConfigurations = getConfigurations();
+ SensorParserConfig sensorParserConfig =
parserConfigurations.getSensorParserConfig(sensorType);
+ RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage(
sensorParserConfig.getRawMessageStrategy()
+ , tuple
+ , originalMessage
+ , sensorParserConfig.getReadMetadata()
+ , sensorParserConfig.getRawMessageStrategyConfig()
+ );
+ parserRunner.setOnSuccess(parserResult -> onSuccess(parserResult,
tuple));
+ parserRunner.execute(sensorType, rawMessage, parserConfigurations);
- List<FieldValidator> fieldValidations =
getConfigurations().getFieldValidations();
- boolean ackTuple = false;
- int numWritten = 0;
- if (sensorParserConfig != null) {
- RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage(
sensorParserConfig.getRawMessageStrategy()
- , tuple
- , originalMessage
- , sensorParserConfig.getReadMetadata()
- , sensorParserConfig.getRawMessageStrategyConfig()
- );
- metadata = rawMessage.getMetadata();
-
- Optional<List<JSONObject>> messages =
parser.parseOptional(rawMessage.getMessage());
- for (JSONObject message :
messages.orElse(Collections.emptyList())) {
- //we want to ack the tuple in the situation where we have are
not doing a bulk write
- //otherwise we want to defer to the writerComponent who will ack
on bulk commit.
- WriterHandler writer =
sensorToComponentMap.get(sensor).getWriter();
- ackTuple = !writer.handleAck();
-
- sensorParserConfig.getRawMessageStrategy().mergeMetadata(
- message,
- metadata,
- sensorParserConfig.getMergeMetadata(),
- sensorParserConfig.getRawMessageStrategyConfig()
- );
- message.put(Constants.SENSOR_TYPE, sensor);
-
- for (FieldTransformer handler :
sensorParserConfig.getFieldTransformations()) {
- if (handler != null) {
- if (!sensorParserConfig.getMergeMetadata()) {
- //if we haven't merged metadata, then we need to pass them
along as configuration params.
- handler.transformAndUpdate(
- message,
- stellarContext,
- sensorParserConfig.getParserConfig(),
- metadata
- );
- } else {
- handler.transformAndUpdate(
- message,
- stellarContext,
- sensorParserConfig.getParserConfig()
- );
- }
- }
- }
- if (!message.containsKey(Constants.GUID)) {
- message.put(Constants.GUID, UUID.randomUUID().toString());
- }
-
- MessageFilter<JSONObject> filter =
sensorToComponentMap.get(sensor).getFilter();
- if (filter == null || filter.emitTuple(message, stellarContext))
{
- boolean isInvalid = !parser.validate(message);
- List<FieldValidator> failedValidators = null;
- if (!isInvalid) {
- failedValidators = getFailedValidators(message,
fieldValidations);
- isInvalid = !failedValidators.isEmpty();
- }
- if (isInvalid) {
- MetronError error = new MetronError()
- .withErrorType(Constants.ErrorType.PARSER_INVALID)
- .withSensorType(Collections.singleton(sensor))
- .addRawMessage(message);
- Set<String> errorFields = failedValidators == null ? null :
failedValidators.stream()
- .flatMap(fieldValidator ->
fieldValidator.getInput().stream())
- .collect(Collectors.toSet());
- if (errorFields != null && !errorFields.isEmpty()) {
- error.withErrorFields(errorFields);
- }
- ErrorUtils.handleError(collector, error);
- } else {
- numWritten++;
- writer.write(sensor, tuple, message, getConfigurations(),
messageGetStrategy);
- }
- }
- }
- }
//if we are supposed to ack the tuple OR if we've never passed this
tuple to the bulk writer
//(meaning that none of the messages are valid either globally or
locally)
//then we want to handle the ack ourselves.
- if (ackTuple || numWritten == 0) {
+ if (!sensorToWriterMap.get(sensorType).handleAck() || numWritten ==
0) {
collector.ack(tuple);
}
-
} catch (Throwable ex) {
- handleError(originalMessage, tuple, ex, collector);
+ handleError(sensorType, originalMessage, tuple, ex, collector);
+ collector.ack(tuple);
}
}
- protected void handleError(byte[] originalMessage, Tuple tuple,
Throwable ex, OutputCollector collector) {
+ protected void handleTickTuple(Tuple tuple) {
+ try {
+ for (Entry<String, WriterHandler> entry :
sensorToWriterMap.entrySet()) {
+ entry.getValue().flush(getConfigurations(), messageGetStrategy);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "This should have been caught in the writerHandler. If you
see this, file a JIRA", e);
+ } finally {
+ collector.ack(tuple);
+ }
+ }
+
+ protected void handleError(String sensorType, byte[] originalMessage,
Tuple tuple, Throwable ex, OutputCollector collector) {
--- End diff --
The sensorType lookup is now outside of the try so we can pass it in.
---