[
https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494357#comment-16494357
]
ASF GitHub Bot commented on METRON-1584:
----------------------------------------
Github user cestella commented on a diff in the pull request:
https://github.com/apache/metron/pull/1036#discussion_r191588901
--- Diff:
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
---
@@ -213,60 +214,144 @@ public void prepare(Map stormConf, TopologyContext
context, OutputCollector coll
@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
+
if (isTick(tuple)) {
- try {
- if (!(bulkMessageWriter instanceof WriterToBulkWriter)) {
- //WriterToBulkWriter doesn't allow batching, so no need to flush
on Tick.
- LOG.debug("Flushing message queues older than their
batchTimeouts");
- getWriterComponent().flushTimeouts(bulkMessageWriter,
configurationTransformation.apply(
- new
IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))
- , messageGetStrategy);
- }
- }
- catch(Exception e) {
- throw new RuntimeException("This should have been caught in the
writerComponent. If you see this, file a JIRA", e);
- }
- finally {
- collector.ack(tuple);
- }
- return;
+ handleTick(tuple);
+
+ } else {
+ handleMessage(tuple);
}
+ }
+
+ /**
+ * Handle a tuple containing a message; anything other than a tick tuple.
+ *
+ * @param tuple The tuple containing a message.
+ */
+ private void handleMessage(Tuple tuple) {
+ try {
+
+ JSONObject message = getMessage(tuple);
+ if(message == null) {
+ handleMissingMessage(tuple);
+ return;
+ }
- try
- {
- JSONObject message = (JSONObject) messageGetStrategy.get(tuple);
String sensorType = MessageUtils.getSensorType(message);
- LOG.trace("Writing enrichment message: {}", message);
- WriterConfiguration writerConfiguration =
configurationTransformation.apply(
- new IndexingWriterConfiguration(bulkMessageWriter.getName(),
getConfigurations()));
if(sensorType == null) {
- //sensor type somehow ended up being null. We want to error this
message directly.
- getWriterComponent().error("null"
- , new Exception("Sensor type is not specified
for message "
- + message.toJSONString()
- )
- , ImmutableList.of(tuple)
- , messageGetStrategy
- );
- }
- else {
- if (writerConfiguration.isDefault(sensorType)) {
- //want to warn, but not fail the tuple
- collector.reportError(new Exception("WARNING: Default and
(likely) unoptimized writer config used for " + bulkMessageWriter.getName() + "
writer and sensor " + sensorType));
- }
-
- getWriterComponent().write(sensorType
- , tuple
- , message
- , bulkMessageWriter
- , writerConfiguration
- , messageGetStrategy
- );
+ handleMissingSensorType(tuple, message);
+ return;
}
+
+ writeMessage(tuple, message, sensorType);
+
+ } catch (Exception e) {
+ throw new RuntimeException("This should have been caught in the
writerComponent. If you see this, file a JIRA", e);
}
- catch(Exception e) {
+ }
+
+ /**
+ * Handles a tick tuple.
+ *
+ * @param tickTuple The tick tuple.
+ */
+ private void handleTick(Tuple tickTuple) {
+
+ try {
+ if (!(bulkMessageWriter instanceof WriterToBulkWriter)) {
+ //WriterToBulkWriter doesn't allow batching, so no need to flush
on Tick.
+ LOG.debug("Flushing message queues older than their
batchTimeouts");
+ getWriterComponent().flushTimeouts(bulkMessageWriter,
configurationTransformation.apply(
+ new
IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))
+ , messageGetStrategy);
+ }
+
+ } catch(Exception e) {
throw new RuntimeException("This should have been caught in the
writerComponent. If you see this, file a JIRA", e);
+
+ } finally {
+ collector.ack(tickTuple);
+ }
+ }
+
+ /**
+ * Retrieves the JSON message contained in a tuple.
+ *
+ * @param tuple The tuple containing a JSON message.
+ * @return The JSON message contained in the tuple. If none, returns
null.
+ */
+ private JSONObject getMessage(Tuple tuple) {
+
--- End diff --
Lines 284, 288, 292
> Indexing Topology Crashes with Invalid Message
> ----------------------------------------------
>
> Key: METRON-1584
> URL: https://issues.apache.org/jira/browse/METRON-1584
> Project: Metron
> Issue Type: Bug
> Reporter: Nick Allen
> Assignee: Nick Allen
> Priority: Major
>
> Per Mohan Venkateshaiah:
> I published message "adkadknalkda;LK;ad;Da;dD;" to indexing topic , I see
> that the random access indexing topology worker thread died and couldn't
> recover until the kafka topic was deleted and recreated.
> {code:java}
> Caused by: java.lang.IllegalStateException: Unable to parse
> adkadknalkda;LK;ad;Da;dD; due to null
> at
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:49)
> ~[stormjar.jar:?]
> at
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
> ~[stormjar.jar:?]
> at
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
> ~[stormjar.jar:?]
> at
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
> ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
> at
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
> ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
> at
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
> ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
> at
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
> ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
> ... 6 more
> Caused by: org.json.simple.parser.ParseException
> at org.json.simple.parser.Yylex.yylex(Yylex.java:610) ~[stormjar.jar:?]
> at org.json.simple.parser.JSONParser.nextToken(JSONParser.java:269)
> ~[stormjar.jar:?]
> at org.json.simple.parser.JSONParser.parse(JSONParser.java:118)
> ~[stormjar.jar:?]
> at org.json.simple.parser.JSONParser.parse(JSONParser.java:81)
> ~[stormjar.jar:?]
> at org.json.simple.parser.JSONParser.parse(JSONParser.java:75)
> ~[stormjar.jar:?]
> at
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:47)
> ~[stormjar.jar:?]
> at
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
> ~[stormjar.jar:?]
> at
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
> ~[stormjar.jar:?]
> at
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
> ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
> at
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
> ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
> at
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
> ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
> at
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
> ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
> ... 6 more
> 2018-05-24 09:21:22.236 o.a.s.util Thread-9-indexingBolt-executor[3 3]
> [ERROR] Halting process: ("Worker died")
> java.lang.RuntimeException: ("Worker died")
> at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341)
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
> at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
> at org.apache.storm.daemon.worker$fn__10799$fn__10800.invoke(worker.clj:763)
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
> at
> org.apache.storm.daemon.executor$mk_executor_data$fn__10011$fn__10012.invoke(executor.clj:276)
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
> at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:494)
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161]
> 2018-05-24 09:21:22.237 o.a.s.d.worker Thread-20 [INFO] Shutting down worker
> random_access_indexing-24-1527147389 703b5bf7-6c9d-46f3-8136-0c4877a69375 6700
> 2018-05-24 09:21:22.237 o.a.s.d.worker Thread-20 [INFO] Terminating messaging
> context
> 2018-05-24 09:21:22.238 o.a.s.d.worker Thread-20 [INFO] Shutting down
> executors
> 2018-05-24 09:21:22.238 o.a.s.d.executor Thread-20 [INFO] Shutting down
> executor
> __metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink:[2 2]
> 2018-05-24 09:21:22.239 o.a.s.util Thread-6-disruptor-executor[2
> 2]-send-queue [INFO] Async loop interrupted!
> 2018-05-24 09:21:22.239 o.a.s.util
> Thread-7-__metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink-executor[2
> 2] [INFO] Async loop interrupted!
> 2018-05-24 09:21:22.240 o.a.h.m.s.s.StormTimelineMetricsSink Thread-20 [INFO]
> Stopping Storm Metrics Sink
> 2018-05-24 09:21:22.240 o.a.s.d.executor Thread-20 [INFO] Shut down executor
> __metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink:[2 2]
> 2018-05-24 09:21:22.241 o.a.s.d.executor Thread-20 [INFO] Shutting down
> executor indexingBolt:[3 3]
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)