METRON-1584 Indexing Topology Crashes with Invalid Message (nickwallen) closes 
apache/metron#1036


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/b6808f76
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/b6808f76
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/b6808f76

Branch: refs/heads/feature/METRON-1554-pcap-query-panel
Commit: b6808f76134260e8a3c24c7641d6e4a235ef48e1
Parents: 45e3ed8
Author: nickwallen <n...@nickallen.org>
Authored: Wed Jun 6 11:07:30 2018 -0400
Committer: nickallen <nickal...@apache.org>
Committed: Wed Jun 6 11:07:30 2018 -0400

----------------------------------------------------------------------
 .../bolt/BulkMessageWriterBoltTest.java         | 52 +++++++++--
 .../metron/writer/BulkWriterComponent.java      | 36 +++++++-
 .../writer/bolt/BulkMessageWriterBolt.java      | 92 +++++++++++++++-----
 3 files changed, 145 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/b6808f76/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
index dedf5e6..52516ac 100644
--- 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
+++ 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
@@ -118,28 +118,32 @@ public class BulkMessageWriterBoltTest extends 
BaseEnrichmentBoltTest {
   private MessageGetStrategy messageGetStrategy;
 
   @Test
-  public void testSensorTypeMissing() throws Exception {
+  public void testSourceTypeMissing() throws Exception {
+
+    // setup the bolt
     BulkMessageWriterBolt bulkMessageWriterBolt = new 
BulkMessageWriterBolt("zookeeperUrl")
-            
.withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
+            .withBulkMessageWriter(bulkMessageWriter)
+            .withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
             .withMessageGetterField("message");
     bulkMessageWriterBolt.setCuratorFramework(client);
     bulkMessageWriterBolt.setZKCache(cache);
     
bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType,
             new FileInputStream(sampleSensorIndexingConfigPath));
 
+    // initialize the bolt
     bulkMessageWriterBolt.declareOutputFields(declarer);
-    verify(declarer, times(1)).declareStream(eq("error"), argThat(
-            new FieldsMatcher("message")));
     Map stormConf = new HashMap();
     bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector);
-    BulkWriterComponent<JSONObject> component = 
mock(BulkWriterComponent.class);
-    bulkMessageWriterBolt.setWriterComponent(component);
-    verify(bulkMessageWriter, 
times(1)).init(eq(stormConf),any(TopologyContext.class), 
any(WriterConfiguration.class));
+
+    // create a message with no source type
     JSONObject message = (JSONObject) new 
JSONParser().parse(sampleMessageString);
     message.remove("source.type");
     when(tuple.getValueByField("message")).thenReturn(message);
+
+    // the tuple should be handled as an error and ack'd
     bulkMessageWriterBolt.execute(tuple);
-    verify(component, times(1)).error(eq("null"), any(), any(), any());
+    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), any());
+    verify(outputCollector, times(1)).ack(tuple);
   }
 
   @Test
@@ -295,4 +299,36 @@ public class BulkMessageWriterBoltTest extends 
BaseEnrichmentBoltTest {
     assertEquals(3, tupleList.size());
     verify(outputCollector, times(5)).ack(tuple);  // 3 messages + 2nd tick
   }
+
+  /**
+   * If an invalid message is sent to indexing, the message should be handled 
as an error
+   * and the topology should continue processing.
+   */
+  @Test
+  public void testMessageInvalid() throws Exception {
+    FakeClock clock = new FakeClock();
+
+    // setup the bolt
+    BulkMessageWriterBolt bolt = new BulkMessageWriterBolt("zookeeperUrl")
+            .withBulkMessageWriter(bulkMessageWriter)
+            .withMessageGetter(MessageGetters.JSON_FROM_POSITION.name())
+            .withMessageGetterField("message");
+    bolt.setCuratorFramework(client);
+    bolt.setZKCache(cache);
+    bolt.getConfigurations().updateSensorIndexingConfig(sensorType, new 
FileInputStream(sampleSensorIndexingConfigPath));
+
+    // initialize the bolt
+    bolt.declareOutputFields(declarer);
+    Map stormConf = new HashMap();
+    bolt.prepare(stormConf, topologyContext, outputCollector, clock);
+
+    // execute a tuple that contains an invalid message
+    byte[] invalidJSON = "this is not valid JSON".getBytes();
+    when(tuple.getBinary(0)).thenReturn(invalidJSON);
+    bolt.execute(tuple);
+
+    // the tuple should be handled as an error and ack'd
+    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), any());
+    verify(outputCollector, times(1)).ack(tuple);
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/b6808f76/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
index 37c624f..dfa265d 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
@@ -19,6 +19,7 @@
 package org.apache.metron.writer;
 
 import com.google.common.collect.Iterables;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.error.MetronError;
@@ -41,6 +42,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
+import static java.lang.String.format;
+
 /**
  * This component implements message batching, with both flush on queue size, 
and flush on queue timeout.
  * There is a queue for each sensorType.
@@ -115,15 +118,40 @@ public class BulkWriterComponent<MESSAGE_T> {
   }
 
   public void error(String sensorType, Throwable e, Iterable<Tuple> tuples, 
MessageGetStrategy messageGetStrategy) {
-    tuples.forEach(t -> collector.ack(t));
+    LOG.error(format("Failing %d tuple(s); sensorType=%s", 
Iterables.size(tuples), sensorType), e);
     MetronError error = new MetronError()
             .withSensorType(sensorType)
             .withErrorType(Constants.ErrorType.INDEXING_ERROR)
             .withThrowable(e);
-    if(!Iterables.isEmpty(tuples)) {
-      LOG.error("Failing {} tuples", Iterables.size(tuples), e);
-    }
     tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t)));
+    handleError(tuples, error);
+  }
+
+  /**
+   * Error a set of tuples that may not contain a valid message.
+   *
+   * <p>Without a valid message, the source type is unknown.
+   * <p>Without a valid message, the JSON message cannot be added to the error.
+   *
+   * @param e The exception that occurred.
+   * @param tuples The tuples to error that may not contain valid messages.
+   */
+  public void error(Throwable e, Iterable<Tuple> tuples) {
+    LOG.error(format("Failing %d tuple(s)", Iterables.size(tuples)), e);
+    MetronError error = new MetronError()
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR)
+            .withThrowable(e);
+    handleError(tuples, error);
+  }
+
+  /**
+   * Errors a set of tuples.
+   *
+   * @param tuples The tuples to error.
+   * @param error
+   */
+  private void handleError(Iterable<Tuple> tuples, MetronError error) {
+    tuples.forEach(t -> collector.ack(t));
     ErrorUtils.handleError(collector, error);
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/b6808f76/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
index b5b97d8..1d8f0c6 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
@@ -234,41 +234,87 @@ public class BulkMessageWriterBolt extends 
ConfiguredIndexingBolt {
 
     try
     {
-      JSONObject message = (JSONObject) messageGetStrategy.get(tuple);
+      JSONObject message = getMessage(tuple);
+      if(message == null) {
+        handleMissingMessage(tuple);
+        return;
+      }
+
       String sensorType = MessageUtils.getSensorType(message);
+      if(sensorType == null) {
+        handleMissingSensorType(tuple, message);
+        return;
+      }
+
       LOG.trace("Writing enrichment message: {}", message);
       WriterConfiguration writerConfiguration = 
configurationTransformation.apply(
               new IndexingWriterConfiguration(bulkMessageWriter.getName(), 
getConfigurations()));
-      if(sensorType == null) {
-        //sensor type somehow ended up being null.  We want to error this 
message directly.
-        getWriterComponent().error("null"
-                             , new Exception("Sensor type is not specified for 
message "
-                                            + message.toJSONString()
-                                            )
-                             , ImmutableList.of(tuple)
-                             , messageGetStrategy
-                             );
-      }
-      else {
-        if (writerConfiguration.isDefault(sensorType)) {
-          //want to warn, but not fail the tuple
-          collector.reportError(new Exception("WARNING: Default and (likely) 
unoptimized writer config used for " + bulkMessageWriter.getName() + " writer 
and sensor " + sensorType));
-        }
 
-        getWriterComponent().write(sensorType
-                , tuple
-                , message
-                , bulkMessageWriter
-                , writerConfiguration
-                , messageGetStrategy
-        );
+      if (writerConfiguration.isDefault(sensorType)) {
+        //want to warn, but not fail the tuple
+        collector.reportError(new Exception("WARNING: Default and (likely) 
unoptimized writer config used for " + bulkMessageWriter.getName() + " writer 
and sensor " + sensorType));
       }
+
+      getWriterComponent().write(sensorType
+              , tuple
+              , message
+              , bulkMessageWriter
+              , writerConfiguration
+              , messageGetStrategy
+      );
     }
     catch(Exception e) {
       throw new RuntimeException("This should have been caught in the 
writerComponent.  If you see this, file a JIRA", e);
     }
   }
 
+  /**
+   * Retrieves the JSON message contained in a tuple.
+   *
+   * @param tuple The tuple containing a JSON message.
+   * @return The JSON message contained in the tuple. If none, returns null.
+   */
+  private JSONObject getMessage(Tuple tuple) {
+    JSONObject message = null;
+    try {
+      message = (JSONObject) messageGetStrategy.get(tuple);
+
+    } catch(Throwable e) {
+      LOG.error("Unable to retrieve message from tuple", e);
+    }
+
+    return message;
+  }
+
+  /**
+   * Handles error processing when a message is missing a sensor type.
+   *
+   * @param tuple The tuple.
+   * @param message The message with no sensor type.
+   */
+  private void handleMissingSensorType(Tuple tuple, JSONObject message) {
+    // sensor type somehow ended up being null.  We want to error this message 
directly.
+    LOG.debug("Message is missing sensor type");
+    getWriterComponent().error("null",
+            new Exception("Sensor type is not specified for message " + 
message.toJSONString()),
+            ImmutableList.of(tuple),
+            messageGetStrategy
+    );
+  }
+
+  /**
+   * Handles error processing when a tuple does not contain a valid message.
+   *
+   * @param tuple The tuple.
+   */
+  private void handleMissingMessage(Tuple tuple) {
+    LOG.debug("Unable to extract message from tuple; expected valid JSON");
+    getWriterComponent().error(
+            new Exception("Unable to extract message from tuple; expected 
valid JSON"),
+            ImmutableList.of(tuple)
+    );
+  }
+
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
     declarer.declareStream(Constants.ERROR_STREAM, new Fields("message"));

Reply via email to