Github user mmiklavc commented on a diff in the pull request:
https://github.com/apache/metron/pull/1213#discussion_r222736631
--- Diff:
metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
---
@@ -242,169 +226,81 @@ public void prepare(Map stormConf, TopologyContext
context, OutputCollector coll
}
}
- protected void initializeStellar() {
- Context.Builder builder = new Context.Builder()
-
.with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
- .with(Context.Capabilities.GLOBAL_CONFIG,
() -> getConfigurations().getGlobalConfig())
- .with(Context.Capabilities.STELLAR_CONFIG,
() -> getConfigurations().getGlobalConfig())
- ;
- if(cache != null) {
- builder = builder.with(Context.Capabilities.CACHE, () -> cache);
- }
- this.stellarContext = builder.build();
- StellarFunctions.initialize(stellarContext);
- }
-
@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
if (TupleUtils.isTick(tuple)) {
- try {
- for (Entry<String, ParserComponents> entry :
sensorToComponentMap.entrySet()) {
- entry.getValue().getWriter().flush(getConfigurations(),
messageGetStrategy);
- }
- } catch (Exception e) {
- throw new RuntimeException(
- "This should have been caught in the writerHandler. If you
see this, file a JIRA", e);
- } finally {
- collector.ack(tuple);
- }
+ handleTickTuple(tuple);
return;
}
-
+ numWritten = 0;
byte[] originalMessage = (byte[]) messageGetStrategy.get(tuple);
+ String topic =
tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName());
+ String sensorType = topicToSensorMap.get(topic);
try {
- SensorParserConfig sensorParserConfig;
- MessageParser<JSONObject> parser;
- String sensor;
- Map<String, Object> metadata;
- if (sensorToComponentMap.size() == 1) {
- // There's only one parser, so grab info directly
- Entry<String, ParserComponents> sensorParser =
sensorToComponentMap.entrySet().iterator()
- .next();
- sensor = sensorParser.getKey();
- parser = sensorParser.getValue().getMessageParser();
- sensorParserConfig = getSensorParserConfig(sensor);
- } else {
- // There's multiple parsers, so pull the topic from the Tuple and
look up the sensor
- String topic =
tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName());
- sensor = topicToSensorMap.get(topic);
- parser = sensorToComponentMap.get(sensor).getMessageParser();
- sensorParserConfig = getSensorParserConfig(sensor);
- }
+ ParserConfigurations parserConfigurations = getConfigurations();
+ SensorParserConfig sensorParserConfig =
parserConfigurations.getSensorParserConfig(sensorType);
+ RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage(
sensorParserConfig.getRawMessageStrategy()
+ , tuple
+ , originalMessage
+ , sensorParserConfig.getReadMetadata()
+ , sensorParserConfig.getRawMessageStrategyConfig()
+ );
+ parserRunner.setOnSuccess(parserResult -> onSuccess(parserResult,
tuple));
--- End diff --
Ah, that's right. This answers my earlier comment about setsuccess/error.
---