Repository: incubator-metron
Updated Branches:
  refs/heads/master 7972a9093 -> 23ff817eb


METRON-496: Field transformations are applied after parser validation closes 
apache/incubator-metron#304


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/23ff817e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/23ff817e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/23ff817e

Branch: refs/heads/master
Commit: 23ff817ebdd3b63be69ad9b6c493b41bb0be4880
Parents: 7972a90
Author: cstella <ceste...@gmail.com>
Authored: Fri Oct 14 10:15:33 2016 -0400
Committer: cstella <ceste...@gmail.com>
Committed: Fri Oct 14 10:15:33 2016 -0400

----------------------------------------------------------------------
 .../apache/metron/parsers/bolt/ParserBolt.java  |  12 +--
 .../apache/metron/parsers/csv/CSVParser.java    |   4 +-
 .../metron/parsers/bolt/ParserBoltTest.java     | 102 +++++++++++++++++++
 3 files changed, 111 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/23ff817e/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index 34dff11..325209f 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -119,13 +119,13 @@ public class ParserBolt extends ConfiguredParserBolt 
implements Serializable {
         List<FieldValidator> fieldValidations = 
getConfigurations().getFieldValidations();
         Optional<List<JSONObject>> messages = 
parser.parseOptional(originalMessage);
         for (JSONObject message : messages.orElse(Collections.emptyList())) {
-          if (parser.validate(message) && filter != null && 
filter.emitTuple(message, stellarContext)) {
-            message.put(Constants.SENSOR_TYPE, getSensorType());
-            for (FieldTransformer handler : 
sensorParserConfig.getFieldTransformations()) {
-              if (handler != null) {
-                handler.transformAndUpdate(message, 
sensorParserConfig.getParserConfig(), stellarContext);
-              }
+          message.put(Constants.SENSOR_TYPE, getSensorType());
+          for (FieldTransformer handler : 
sensorParserConfig.getFieldTransformations()) {
+            if (handler != null) {
+              handler.transformAndUpdate(message, 
sensorParserConfig.getParserConfig(), stellarContext);
             }
+          }
+          if (parser.validate(message) && filter != null && 
filter.emitTuple(message, stellarContext)) {
             numWritten++;
             if(!isGloballyValid(message, fieldValidations)) {
               message.put(Constants.SENSOR_TYPE, getSensorType()+ ".invalid");

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/23ff817e/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/csv/CSVParser.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/csv/CSVParser.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/csv/CSVParser.java
index 19a768e..52d45c9 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/csv/CSVParser.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/csv/CSVParser.java
@@ -78,7 +78,9 @@ public class CSVParser extends BasicParser {
           }
         }
         JSONObject jsonVal = new JSONObject(value);
-        jsonVal.put("timestamp", timestamp);
+        if(timestamp != null) {
+          jsonVal.put("timestamp", timestamp);
+        }
         return ImmutableList.of(jsonVal);
       }
       else {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/23ff817e/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index 1f62e4f..5d93838 100644
--- 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -24,16 +24,24 @@ import backtype.storm.tuple.Tuple;
 import com.google.common.collect.ImmutableList;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.dsl.Context;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.parsers.BasicParser;
+import org.apache.metron.parsers.csv.CSVParser;
 import org.apache.metron.test.bolt.BaseBoltTest;
 import org.apache.metron.parsers.interfaces.MessageFilter;
 import org.apache.metron.parsers.interfaces.MessageParser;
 import org.apache.metron.common.writer.MessageWriter;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mock;
 
@@ -76,6 +84,32 @@ public class ParserBoltTest extends BaseBoltTest {
   @Mock
   private Tuple t5;
 
+  private static class RecordingWriter implements 
BulkMessageWriter<JSONObject> {
+    List<JSONObject> records = new ArrayList<>();
+
+    @Override
+    public void init(Map stormConf, WriterConfiguration config) throws 
Exception {
+
+    }
+
+    @Override
+    public BulkWriterResponse write(String sensorType, WriterConfiguration 
configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws 
Exception {
+      records.addAll(messages);
+      BulkWriterResponse ret = new BulkWriterResponse();
+      ret.addAllSuccesses(tuples);
+      return ret;
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+
+    public List<JSONObject> getRecords() {
+      return records;
+    }
+  }
+
 
   @Test
   public void testEmpty() throws Exception {
@@ -239,6 +273,74 @@ public void testImplicitBatchOfOne() throws Exception {
     parserBolt.execute(t1);
     verify(outputCollector, times(1)).ack(t1);
   }
+
+  /**
+  {
+     "sensorTopic":"dummy"
+     ,"parserConfig": {
+      "batchSize" : 1
+     }
+      ,"fieldTransformations" : [
+          {
+           "transformation" : "STELLAR"
+          ,"output" : "timestamp"
+          ,"config" : {
+            "timestamp" : "TO_EPOCH_TIMESTAMP(timestampstr, 'yyyy-MM-dd 
HH:mm:ss', 'UTC')"
+                      }
+          }
+                               ]
+   }
+   */
+  @Multiline
+  public static String csvWithFieldTransformations;
+
+  @Test
+  public void testFieldTransformationPriorToValidation() {
+    String sensorType = "dummy";
+    RecordingWriter recordingWriter = new RecordingWriter();
+    //create a parser which acts like a basic parser but returns no timestamp 
field.
+    BasicParser dummyParser = new BasicParser() {
+      @Override
+      public void init() {
+
+      }
+
+      @Override
+      public List<JSONObject> parse(byte[] rawMessage) {
+        return ImmutableList.of(new JSONObject() {{
+                put("data", "foo");
+                put("timestampstr", "2016-01-05 17:02:30");
+                put("original_string", "blah");
+              }});
+      }
+
+      @Override
+      public void configure(Map<String, Object> config) {
+
+      }
+    };
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, 
dummyParser, new WriterHandler(recordingWriter)) {
+      @Override
+      protected SensorParserConfig getSensorParserConfig() {
+        try {
+          return 
SensorParserConfig.fromBytes(Bytes.toBytes(csvWithFieldTransformations));
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+    parserBolt.setCuratorFramework(client);
+    parserBolt.setTreeCache(cache);
+    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
+    when(t1.getBinary(0)).thenReturn(new byte[] {});
+    parserBolt.execute(t1);
+    Assert.assertEquals(1, recordingWriter.getRecords().size());
+    long expected = 1452013350000L;
+    Assert.assertEquals(expected, 
recordingWriter.getRecords().get(0).get("timestamp"));
+  }
+
+
+
   @Test
   public void testBatchOfOne() throws Exception {
 

Reply via email to