METRON-1751 Storm Profiler dies when consuming null message (nickwallen) closes apache/metron#1176
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/d32bd50d Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/d32bd50d Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/d32bd50d Branch: refs/remotes/upstream/feature/METRON-1699-create-batch-profiler Commit: d32bd50d43aae87af9ec12d2daea83b2f4eca342 Parents: 661e23e Author: nickwallen <n...@nickallen.org> Authored: Wed Aug 29 14:55:58 2018 -0400 Committer: nickallen <nickal...@apache.org> Committed: Wed Aug 29 14:55:58 2018 -0400 ---------------------------------------------------------------------- .../profiler/bolt/ProfileSplitterBolt.java | 29 +++++++++++--------- .../profiler/bolt/ProfileSplitterBoltTest.java | 16 +++++++++++ 2 files changed, 32 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/d32bd50d/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java index f28411f..87f1ba9 100644 --- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java +++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java @@ -144,9 +144,9 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt { try { doExecute(input); - } catch (IllegalArgumentException | ParseException | UnsupportedEncodingException e) { - LOG.error("Unexpected error", e); - collector.reportError(e); + } catch (Throwable t) { + LOG.error("Unexpected error", t); + collector.reportError(t); } finally { collector.ack(input); @@ -157,22 +157,25 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt { // retrieve the input message byte[] data = input.getBinary(0); + if(data == null) { + LOG.debug("Received null message. Nothing to do."); + return; + } JSONObject message = (JSONObject) parser.parse(new String(data, "UTF8")); // ensure there is a valid profiler configuration ProfilerConfig config = getProfilerConfig(); - if(config != null && config.getProfiles().size() > 0) { - - // what time is it? - Clock clock = clockFactory.createClock(config); - Optional<Long> timestamp = clock.currentTimeMillis(message); + if(config == null || getProfilerConfig().getProfiles().size() == 0) { + LOG.debug("No Profiler configuration found. Nothing to do."); + return; + } - // route the message. if a message does not contain the timestamp field, it cannot be routed. - timestamp.ifPresent(ts -> routeMessage(input, message, config, ts)); + // what time is it? + Clock clock = clockFactory.createClock(config); + Optional<Long> timestamp = clock.currentTimeMillis(message); - } else { - LOG.debug("No Profiler configuration found. Nothing to do."); - } + // route the message. if a message does not contain the timestamp field, it cannot be routed. + timestamp.ifPresent(ts -> routeMessage(input, message, config, ts)); } /** http://git-wip-us.apache.org/repos/asf/metron/blob/d32bd50d/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java index bf81923..72e2b72 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java @@ -404,6 +404,22 @@ public class ProfileSplitterBoltTest extends BaseBoltTest { .emit(any(Values.class)); } + @Test + public void testWithNullMessage() throws Exception { + + // ensure the tuple returns null to mimic a null message in kafka + when(tuple.getBinary(0)).thenReturn(null); + + ProfilerConfig config = toProfilerConfig(profileWithOnlyIfInvalid); + ProfileSplitterBolt bolt = createBolt(config); + bolt.execute(tuple); + + // a tuple should NOT be emitted for the downstream profile builder + verify(outputCollector, times(0)) + .emit(any(Values.class)); + + } + /** * Creates a ProfilerConfig based on a string containing JSON. *