Github user merrimanr commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1213#discussion_r220992847
  
    --- Diff: 
metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
 ---
    @@ -242,169 +226,81 @@ public void prepare(Map stormConf, TopologyContext 
context, OutputCollector coll
         }
       }
     
    -  protected void initializeStellar() {
    -    Context.Builder builder = new Context.Builder()
    -                                
.with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
    -                                .with(Context.Capabilities.GLOBAL_CONFIG, 
() -> getConfigurations().getGlobalConfig())
    -                                .with(Context.Capabilities.STELLAR_CONFIG, 
() -> getConfigurations().getGlobalConfig())
    -                                ;
    -    if(cache != null) {
    -      builder = builder.with(Context.Capabilities.CACHE, () -> cache);
    -    }
    -    this.stellarContext = builder.build();
    -    StellarFunctions.initialize(stellarContext);
    -  }
    -
     
       @SuppressWarnings("unchecked")
       @Override
       public void execute(Tuple tuple) {
         if (TupleUtils.isTick(tuple)) {
    -      try {
    -        for (Entry<String, ParserComponents> entry : 
sensorToComponentMap.entrySet()) {
    -          entry.getValue().getWriter().flush(getConfigurations(), 
messageGetStrategy);
    -        }
    -      } catch (Exception e) {
    -        throw new RuntimeException(
    -            "This should have been caught in the writerHandler.  If you 
see this, file a JIRA", e);
    -      } finally {
    -        collector.ack(tuple);
    -      }
    +      handleTickTuple(tuple);
           return;
         }
    -
    +    numWritten = 0;
         byte[] originalMessage = (byte[]) messageGetStrategy.get(tuple);
    +    String topic = 
tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName());
    +    String sensorType = topicToSensorMap.get(topic);
         try {
    -      SensorParserConfig sensorParserConfig;
    -      MessageParser<JSONObject> parser;
    -      String sensor;
    -      Map<String, Object> metadata;
    -      if (sensorToComponentMap.size() == 1) {
    -        // There's only one parser, so grab info directly
    -        Entry<String, ParserComponents> sensorParser = 
sensorToComponentMap.entrySet().iterator()
    -            .next();
    -        sensor = sensorParser.getKey();
    -        parser = sensorParser.getValue().getMessageParser();
    -        sensorParserConfig = getSensorParserConfig(sensor);
    -      } else {
    -        // There's multiple parsers, so pull the topic from the Tuple and 
look up the sensor
    -        String topic = 
tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName());
    -        sensor = topicToSensorMap.get(topic);
    -        parser = sensorToComponentMap.get(sensor).getMessageParser();
    -        sensorParserConfig = getSensorParserConfig(sensor);
    -      }
    +      ParserConfigurations parserConfigurations = getConfigurations();
    +      SensorParserConfig sensorParserConfig = 
parserConfigurations.getSensorParserConfig(sensorType);
    +      RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage( 
sensorParserConfig.getRawMessageStrategy()
    +              , tuple
    +              , originalMessage
    +              , sensorParserConfig.getReadMetadata()
    +              , sensorParserConfig.getRawMessageStrategyConfig()
    +      );
    +      parserRunner.setOnSuccess(parserResult -> onSuccess(parserResult, 
tuple));
    +      parserRunner.execute(sensorType, rawMessage, parserConfigurations);
     
    -      List<FieldValidator> fieldValidations = 
getConfigurations().getFieldValidations();
    -      boolean ackTuple = false;
    -      int numWritten = 0;
    -      if (sensorParserConfig != null) {
    -        RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage( 
sensorParserConfig.getRawMessageStrategy()
    -            , tuple
    -            , originalMessage
    -            , sensorParserConfig.getReadMetadata()
    -            , sensorParserConfig.getRawMessageStrategyConfig()
    -        );
    -        metadata = rawMessage.getMetadata();
    -
    -        Optional<List<JSONObject>> messages = 
parser.parseOptional(rawMessage.getMessage());
    -        for (JSONObject message : 
messages.orElse(Collections.emptyList())) {
    -          //we want to ack the tuple in the situation where we have are 
not doing a bulk write
    -          //otherwise we want to defer to the writerComponent who will ack 
on bulk commit.
    -          WriterHandler writer = 
sensorToComponentMap.get(sensor).getWriter();
    -          ackTuple = !writer.handleAck();
    -
    -          sensorParserConfig.getRawMessageStrategy().mergeMetadata(
    -              message,
    -              metadata,
    -              sensorParserConfig.getMergeMetadata(),
    -              sensorParserConfig.getRawMessageStrategyConfig()
    -          );
    -          message.put(Constants.SENSOR_TYPE, sensor);
    -
    -          for (FieldTransformer handler : 
sensorParserConfig.getFieldTransformations()) {
    -            if (handler != null) {
    -              if (!sensorParserConfig.getMergeMetadata()) {
    -                //if we haven't merged metadata, then we need to pass them 
along as configuration params.
    -                handler.transformAndUpdate(
    -                    message,
    -                    stellarContext,
    -                    sensorParserConfig.getParserConfig(),
    -                    metadata
    -                );
    -              } else {
    -                handler.transformAndUpdate(
    -                    message,
    -                    stellarContext,
    -                    sensorParserConfig.getParserConfig()
    -                );
    -              }
    -            }
    -          }
    -          if (!message.containsKey(Constants.GUID)) {
    -            message.put(Constants.GUID, UUID.randomUUID().toString());
    -          }
    -
    -          MessageFilter<JSONObject> filter = 
sensorToComponentMap.get(sensor).getFilter();
    -          if (filter == null || filter.emitTuple(message, stellarContext)) 
{
    -            boolean isInvalid = !parser.validate(message);
    -            List<FieldValidator> failedValidators = null;
    -            if (!isInvalid) {
    -              failedValidators = getFailedValidators(message, 
fieldValidations);
    -              isInvalid = !failedValidators.isEmpty();
    -            }
    -            if (isInvalid) {
    -              MetronError error = new MetronError()
    -                  .withErrorType(Constants.ErrorType.PARSER_INVALID)
    -                  .withSensorType(Collections.singleton(sensor))
    -                  .addRawMessage(message);
    -              Set<String> errorFields = failedValidators == null ? null : 
failedValidators.stream()
    -                  .flatMap(fieldValidator -> 
fieldValidator.getInput().stream())
    -                  .collect(Collectors.toSet());
    -              if (errorFields != null && !errorFields.isEmpty()) {
    -                error.withErrorFields(errorFields);
    -              }
    -              ErrorUtils.handleError(collector, error);
    -            } else {
    -              numWritten++;
    -              writer.write(sensor, tuple, message, getConfigurations(), 
messageGetStrategy);
    -            }
    -          }
    -        }
    -      }
           //if we are supposed to ack the tuple OR if we've never passed this 
tuple to the bulk writer
           //(meaning that none of the messages are valid either globally or 
locally)
           //then we want to handle the ack ourselves.
    -      if (ackTuple || numWritten == 0) {
    +      if (!sensorToWriterMap.get(sensorType).handleAck() || numWritten == 
0) {
             collector.ack(tuple);
           }
    -
         } catch (Throwable ex) {
    -      handleError(originalMessage, tuple, ex, collector);
    +      handleError(sensorType, originalMessage, tuple, ex, collector);
    +      collector.ack(tuple);
         }
       }
     
    -  protected void handleError(byte[] originalMessage, Tuple tuple, 
Throwable ex, OutputCollector collector) {
    +  protected void handleTickTuple(Tuple tuple) {
    +    try {
    +      for (Entry<String, WriterHandler> entry : 
sensorToWriterMap.entrySet()) {
    +        entry.getValue().flush(getConfigurations(), messageGetStrategy);
    +      }
    +    } catch (Exception e) {
    +      throw new RuntimeException(
    +              "This should have been caught in the writerHandler.  If you 
see this, file a JIRA", e);
    +    } finally {
    +      collector.ack(tuple);
    +    }
    +  }
    +
    +  protected void handleError(String sensorType, byte[] originalMessage, 
Tuple tuple, Throwable ex, OutputCollector collector) {
    --- End diff --
    
    The sensorType lookup is now outside of the try so we can pass it in.


---

Reply via email to