METRON-1584 Indexing Topology Crashes with Invalid Message (nickwallen) closes apache/metron#1036
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/b6808f76 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/b6808f76 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/b6808f76 Branch: refs/heads/feature/METRON-1554-pcap-query-panel Commit: b6808f76134260e8a3c24c7641d6e4a235ef48e1 Parents: 45e3ed8 Author: nickwallen <n...@nickallen.org> Authored: Wed Jun 6 11:07:30 2018 -0400 Committer: nickallen <nickal...@apache.org> Committed: Wed Jun 6 11:07:30 2018 -0400 ---------------------------------------------------------------------- .../bolt/BulkMessageWriterBoltTest.java | 52 +++++++++-- .../metron/writer/BulkWriterComponent.java | 36 +++++++- .../writer/bolt/BulkMessageWriterBolt.java | 92 +++++++++++++++----- 3 files changed, 145 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/b6808f76/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java index dedf5e6..52516ac 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java @@ -118,28 +118,32 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { private MessageGetStrategy messageGetStrategy; @Test - public void testSensorTypeMissing() throws Exception { + public void testSourceTypeMissing() throws Exception { + + // setup the bolt BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl") - .withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()) + .withBulkMessageWriter(bulkMessageWriter) + .withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()) .withMessageGetterField("message"); bulkMessageWriterBolt.setCuratorFramework(client); bulkMessageWriterBolt.setZKCache(cache); bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType, new FileInputStream(sampleSensorIndexingConfigPath)); + // initialize the bolt bulkMessageWriterBolt.declareOutputFields(declarer); - verify(declarer, times(1)).declareStream(eq("error"), argThat( - new FieldsMatcher("message"))); Map stormConf = new HashMap(); bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector); - BulkWriterComponent<JSONObject> component = mock(BulkWriterComponent.class); - bulkMessageWriterBolt.setWriterComponent(component); - verify(bulkMessageWriter, times(1)).init(eq(stormConf),any(TopologyContext.class), any(WriterConfiguration.class)); + + // create a message with no source type JSONObject message = (JSONObject) new JSONParser().parse(sampleMessageString); message.remove("source.type"); when(tuple.getValueByField("message")).thenReturn(message); + + // the tuple should be handled as an error and ack'd bulkMessageWriterBolt.execute(tuple); - verify(component, times(1)).error(eq("null"), any(), any(), any()); + verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), any()); + verify(outputCollector, times(1)).ack(tuple); } @Test @@ -295,4 +299,36 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { assertEquals(3, tupleList.size()); verify(outputCollector, times(5)).ack(tuple); // 3 messages + 2nd tick } + + /** + * If an invalid message is sent to indexing, the message should be handled as an error + * and the topology should continue processing. + */ + @Test + public void testMessageInvalid() throws Exception { + FakeClock clock = new FakeClock(); + + // setup the bolt + BulkMessageWriterBolt bolt = new BulkMessageWriterBolt("zookeeperUrl") + .withBulkMessageWriter(bulkMessageWriter) + .withMessageGetter(MessageGetters.JSON_FROM_POSITION.name()) + .withMessageGetterField("message"); + bolt.setCuratorFramework(client); + bolt.setZKCache(cache); + bolt.getConfigurations().updateSensorIndexingConfig(sensorType, new FileInputStream(sampleSensorIndexingConfigPath)); + + // initialize the bolt + bolt.declareOutputFields(declarer); + Map stormConf = new HashMap(); + bolt.prepare(stormConf, topologyContext, outputCollector, clock); + + // execute a tuple that contains an invalid message + byte[] invalidJSON = "this is not valid JSON".getBytes(); + when(tuple.getBinary(0)).thenReturn(invalidJSON); + bolt.execute(tuple); + + // the tuple should be handled as an error and ack'd + verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), any()); + verify(outputCollector, times(1)).ack(tuple); + } } http://git-wip-us.apache.org/repos/asf/metron/blob/b6808f76/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java index 37c624f..dfa265d 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java @@ -19,6 +19,7 @@ package org.apache.metron.writer; import com.google.common.collect.Iterables; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.error.MetronError; @@ -41,6 +42,8 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; +import static java.lang.String.format; + /** * This component implements message batching, with both flush on queue size, and flush on queue timeout. * There is a queue for each sensorType. @@ -115,15 +118,40 @@ public class BulkWriterComponent<MESSAGE_T> { } public void error(String sensorType, Throwable e, Iterable<Tuple> tuples, MessageGetStrategy messageGetStrategy) { - tuples.forEach(t -> collector.ack(t)); + LOG.error(format("Failing %d tuple(s); sensorType=%s", Iterables.size(tuples), sensorType), e); MetronError error = new MetronError() .withSensorType(sensorType) .withErrorType(Constants.ErrorType.INDEXING_ERROR) .withThrowable(e); - if(!Iterables.isEmpty(tuples)) { - LOG.error("Failing {} tuples", Iterables.size(tuples), e); - } tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t))); + handleError(tuples, error); + } + + /** + * Error a set of tuples that may not contain a valid message. + * + * <p>Without a valid message, the source type is unknown. + * <p>Without a valid message, the JSON message cannot be added to the error. + * + * @param e The exception that occurred. + * @param tuples The tuples to error that may not contain valid messages. + */ + public void error(Throwable e, Iterable<Tuple> tuples) { + LOG.error(format("Failing %d tuple(s)", Iterables.size(tuples)), e); + MetronError error = new MetronError() + .withErrorType(Constants.ErrorType.INDEXING_ERROR) + .withThrowable(e); + handleError(tuples, error); + } + + /** + * Errors a set of tuples. + * + * @param tuples The tuples to error. + * @param error + */ + private void handleError(Iterable<Tuple> tuples, MetronError error) { + tuples.forEach(t -> collector.ack(t)); ErrorUtils.handleError(collector, error); } http://git-wip-us.apache.org/repos/asf/metron/blob/b6808f76/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java index b5b97d8..1d8f0c6 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java @@ -234,41 +234,87 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { try { - JSONObject message = (JSONObject) messageGetStrategy.get(tuple); + JSONObject message = getMessage(tuple); + if(message == null) { + handleMissingMessage(tuple); + return; + } + String sensorType = MessageUtils.getSensorType(message); + if(sensorType == null) { + handleMissingSensorType(tuple, message); + return; + } + LOG.trace("Writing enrichment message: {}", message); WriterConfiguration writerConfiguration = configurationTransformation.apply( new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())); - if(sensorType == null) { - //sensor type somehow ended up being null. We want to error this message directly. - getWriterComponent().error("null" - , new Exception("Sensor type is not specified for message " - + message.toJSONString() - ) - , ImmutableList.of(tuple) - , messageGetStrategy - ); - } - else { - if (writerConfiguration.isDefault(sensorType)) { - //want to warn, but not fail the tuple - collector.reportError(new Exception("WARNING: Default and (likely) unoptimized writer config used for " + bulkMessageWriter.getName() + " writer and sensor " + sensorType)); - } - getWriterComponent().write(sensorType - , tuple - , message - , bulkMessageWriter - , writerConfiguration - , messageGetStrategy - ); + if (writerConfiguration.isDefault(sensorType)) { + //want to warn, but not fail the tuple + collector.reportError(new Exception("WARNING: Default and (likely) unoptimized writer config used for " + bulkMessageWriter.getName() + " writer and sensor " + sensorType)); } + + getWriterComponent().write(sensorType + , tuple + , message + , bulkMessageWriter + , writerConfiguration + , messageGetStrategy + ); } catch(Exception e) { throw new RuntimeException("This should have been caught in the writerComponent. If you see this, file a JIRA", e); } } + /** + * Retrieves the JSON message contained in a tuple. + * + * @param tuple The tuple containing a JSON message. + * @return The JSON message contained in the tuple. If none, returns null. + */ + private JSONObject getMessage(Tuple tuple) { + JSONObject message = null; + try { + message = (JSONObject) messageGetStrategy.get(tuple); + + } catch(Throwable e) { + LOG.error("Unable to retrieve message from tuple", e); + } + + return message; + } + + /** + * Handles error processing when a message is missing a sensor type. + * + * @param tuple The tuple. + * @param message The message with no sensor type. + */ + private void handleMissingSensorType(Tuple tuple, JSONObject message) { + // sensor type somehow ended up being null. We want to error this message directly. + LOG.debug("Message is missing sensor type"); + getWriterComponent().error("null", + new Exception("Sensor type is not specified for message " + message.toJSONString()), + ImmutableList.of(tuple), + messageGetStrategy + ); + } + + /** + * Handles error processing when a tuple does not contain a valid message. + * + * @param tuple The tuple. + */ + private void handleMissingMessage(Tuple tuple) { + LOG.debug("Unable to extract message from tuple; expected valid JSON"); + getWriterComponent().error( + new Exception("Unable to extract message from tuple; expected valid JSON"), + ImmutableList.of(tuple) + ); + } + @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(Constants.ERROR_STREAM, new Fields("message"));