Github user merrimanr commented on a diff in the pull request:
https://github.com/apache/metron/pull/1213#discussion_r220988475
--- Diff:
metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
---
@@ -242,169 +226,81 @@ public void prepare(Map stormConf, TopologyContext
context, OutputCollector coll
}
}
- protected void initializeStellar() {
- Context.Builder builder = new Context.Builder()
-
.with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
- .with(Context.Capabilities.GLOBAL_CONFIG,
() -> getConfigurations().getGlobalConfig())
- .with(Context.Capabilities.STELLAR_CONFIG,
() -> getConfigurations().getGlobalConfig())
- ;
- if(cache != null) {
- builder = builder.with(Context.Capabilities.CACHE, () -> cache);
- }
- this.stellarContext = builder.build();
- StellarFunctions.initialize(stellarContext);
- }
-
@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
if (TupleUtils.isTick(tuple)) {
- try {
- for (Entry<String, ParserComponents> entry :
sensorToComponentMap.entrySet()) {
- entry.getValue().getWriter().flush(getConfigurations(),
messageGetStrategy);
- }
- } catch (Exception e) {
- throw new RuntimeException(
- "This should have been caught in the writerHandler. If you
see this, file a JIRA", e);
- } finally {
- collector.ack(tuple);
- }
+ handleTickTuple(tuple);
return;
}
-
+ numWritten = 0;
byte[] originalMessage = (byte[]) messageGetStrategy.get(tuple);
+ String topic =
tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName());
+ String sensorType = topicToSensorMap.get(topic);
try {
- SensorParserConfig sensorParserConfig;
- MessageParser<JSONObject> parser;
- String sensor;
- Map<String, Object> metadata;
- if (sensorToComponentMap.size() == 1) {
--- End diff --
I refactored this to always look up the sensor from the topic. I think
this makes it easier to read and test.
---