Repository: apex-malhar Updated Branches: refs/heads/master ec7b480ac -> 23061c224
APEXMALHAR-2218: Creation of RegexSplitter operator Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/ffba8a22 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/ffba8a22 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/ffba8a22 Branch: refs/heads/master Commit: ffba8a2258f9ced1eb430f13e8909bbdf1c173c7 Parents: dd5341f Author: venkateshDT <[email protected]> Authored: Tue Aug 30 14:59:44 2016 -0700 Committer: venkateshDT <[email protected]> Committed: Mon Feb 27 01:29:01 2017 -0800 ---------------------------------------------------------------------- .../datatorrent/contrib/parser/RegexParser.java | 234 +++++++++++++++++ .../contrib/parser/RegexParserTest.java | 261 +++++++++++++++++++ .../src/test/resources/RegexSplitterschema.json | 20 ++ 3 files changed, 515 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ffba8a22/contrib/src/main/java/com/datatorrent/contrib/parser/RegexParser.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/RegexParser.java b/contrib/src/main/java/com/datatorrent/contrib/parser/RegexParser.java new file mode 100644 index 0000000..a68c928 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/parser/RegexParser.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.parser; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Date; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.beanutils.BeanUtils; +import org.apache.commons.beanutils.ConversionException; +import org.apache.commons.beanutils.ConvertUtils; +import org.apache.commons.beanutils.converters.DateConverter; +import org.apache.commons.beanutils.converters.DateTimeConverter; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.annotations.VisibleForTesting; + +import com.datatorrent.api.Context; +import com.datatorrent.lib.parser.Parser; +import com.datatorrent.lib.util.KeyValPair; + +/** + * Operator parses tuple based on regex pattern and populates POJO matching the user defined schema <br> + * This operator expects the upstream operator to send every line in the file as a byte array. + * splitRegexPattern contains the regex pattern of lines in the file <br> + * Schema is specified in a json format as per {@link DelimitedSchema} that + * contains field information and constraints for each field.<br> + * Schema field names should match with the POJO variable names<br> + * Assumption is that each field in the delimited data should map to a simple + * java type.<br> + * <br> + * <b>Properties</b> <br> + * <b>splitRegexPattern</b>:Regex pattern as a string<br> + * + * @displayName RegexParser + * @category Parsers + * @tags pojo parser regex logs server + * @since 3.7.0 + */ [email protected] +public class RegexParser extends Parser<byte[], KeyValPair<String, String>> +{ + /** + * Contents of the schema.Schema is specified in a json format as per + * {@link DelimitedSchema} + */ + @NotNull + private String schema; + /** + * Schema is read into this object to access fields + */ + private transient DelimitedSchema delimitedParserSchema; + /** + * Regex Pattern defined for the tuple + */ + @NotNull + private String splitRegexPattern; + /** + * Pattern to store the compiled regex + */ + private transient Pattern pattern; + + @Override + public void setup(Context.OperatorContext context) + { + delimitedParserSchema = new DelimitedSchema(schema); + pattern = Pattern.compile(splitRegexPattern); + } + + @Override + public void processTuple(byte[] tuple) + { + if (tuple == null) { + if (err.isConnected()) { + err.emit(new KeyValPair<String, String>(null, "Blank/null tuple")); + } + errorTupleCount++; + return; + } + String incomingString = new String(tuple); + if (StringUtils.isBlank(incomingString)) { + if (err.isConnected()) { + err.emit(new KeyValPair<String, String>(incomingString, "Blank tuple")); + } + errorTupleCount++; + return; + } + try { + if (out.isConnected() && clazz != null) { + Matcher matcher = pattern.matcher(incomingString); + boolean patternMatched = false; + Constructor<?> ctor = clazz.getConstructor(); + Object object = ctor.newInstance(); + + if (matcher.find()) { + for (int i = 0; i <= matcher.groupCount()-1; i++) { + if (delimitedParserSchema.getFields().get(i).getType() == DelimitedSchema.FieldType.DATE) { + DateTimeConverter dtConverter = new DateConverter(); + dtConverter.setPattern((String)delimitedParserSchema.getFields().get(i).getConstraints().get(DelimitedSchema.DATE_FORMAT)); + ConvertUtils.register(dtConverter, Date.class); + } + BeanUtils.setProperty(object, delimitedParserSchema.getFields().get(i).getName(), matcher.group(i+1)); + } + patternMatched = true; + } + if (!patternMatched) { + throw new ConversionException("The incoming tuple do not match with the Regex pattern defined."); + } + + out.emit(object); + emittedObjectCount++; + } + + } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException | InstantiationException | ConversionException e) { + if (err.isConnected()) { + err.emit(new KeyValPair<String, String>(incomingString, e.getMessage())); + logger.debug("Regex Expression : {} Incoming tuple : {}", splitRegexPattern, incomingString); + } + errorTupleCount++; + logger.error("Tuple could not be parsed. Reason {}", e.getMessage()); + } + } + + /** + * Set the schema that defines the format of the tuple + * + * @param schema + */ + public void setSchema(String schema) + { + this.schema = schema; + } + + /** + * Set the Regex Pattern expected for the incoming tuple + * + * @param splitRegexPattern + */ + public void setSplitRegexPattern(String splitRegexPattern) + { + this.splitRegexPattern = splitRegexPattern; + } + + /** + * Get the schema value + * + * @return schema + */ + public String getSchema() + { + return schema; + } + + /** + * Get the Regex Pattern value + * + * @return splitRegexPattern + */ + public String getSplitRegexPattern() + { + return splitRegexPattern; + } + + @Override + public Object convert(byte[] tuple) + { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public KeyValPair<String, String> processErrorTuple(byte[] input) + { + throw new UnsupportedOperationException("Not supported"); + } + + /** + * Get errorTupleCount + * + * @return errorTupleCount + */ + @VisibleForTesting + public long getErrorTupleCount() + { + return errorTupleCount; + } + + /** + * Get emittedObjectCount + * + * @return emittedObjectCount + */ + @VisibleForTesting + public long getEmittedObjectCount() + { + return emittedObjectCount; + } + + /** + * Get incomingTuplesCount + * + * @return incomingTuplesCount + */ + @VisibleForTesting + public long getIncomingTuplesCount() + { + return incomingTuplesCount; + } + + private static final Logger logger = LoggerFactory.getLogger(RegexParser.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ffba8a22/contrib/src/test/java/com/datatorrent/contrib/parser/RegexParserTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/parser/RegexParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/parser/RegexParserTest.java new file mode 100644 index 0000000..ee6e029 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/parser/RegexParserTest.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.parser; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; + +import org.codehaus.jettison.json.JSONException; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import com.datatorrent.lib.appdata.schemas.SchemaUtils; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.KeyValPair; + +public class RegexParserTest +{ + RegexParser regex = new RegexParser(); + private CollectorTestSink<Object> error = new CollectorTestSink<Object>(); + + private CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>(); + + @Rule + public Watcher watcher = new Watcher(); + + public class Watcher extends TestWatcher + { + @Override + protected void starting(Description description) + { + super.starting(description); + regex.err.setSink(error); + regex.out.setSink(pojoPort); + regex.setSchema(SchemaUtils.jarResourceFileToString("RegexSplitterschema.json")); + regex.setSplitRegexPattern(".+\\[SEQ=\\w+\\]\\s*(\\d+:[\\d\\d:]+)\\s(\\d+)\\s*(.+)"); + regex.setClazz(ServerLog.class); + regex.setup(null); + } + + @Override + protected void finished(Description description) + { + super.finished(description); + } + + } + + @Test + public void TestValidInputCase() throws ParseException + { + regex.beginWindow(0); + String line = "2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717]" + + " 2015:10:01:03:14:49 101 [email protected] ip_address=1.1.1.1 service_id=IP1234-NPB12345_00 " + + "result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf account_id=11111 platform=pik"; + regex.in.process(line.getBytes()); + regex.endWindow(); + Assert.assertEquals(1, pojoPort.collectedTuples.size()); + Assert.assertEquals(0, error.collectedTuples.size()); + Object obj = pojoPort.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(ServerLog.class, obj.getClass()); + ServerLog pojo = (ServerLog)obj; + Assert.assertEquals(101, pojo.getId()); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy:MM:dd:hh:mm:ss"); + Date date = sdf.parse("2015:10:01:03:14:49"); + Assert.assertEquals(date, pojo.getDate()); + Assert.assertEquals("[email protected] ip_address=1.1.1.1 service_id=IP1234-NPB12345_00" + + " result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf account_id=11111 " + + "platform=pik", pojo.getMessage()); + Assert.assertEquals(1, regex.getIncomingTuplesCount()); + Assert.assertEquals(1, regex.getEmittedObjectCount()); + } + + @Test + public void testEmptyInput() throws JSONException + { + String tuple = ""; + regex.beginWindow(0); + regex.in.process(tuple.getBytes()); + regex.endWindow(); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + Assert.assertEquals(1, regex.getIncomingTuplesCount()); + Assert.assertEquals(1, regex.getErrorTupleCount()); + } + + @Test + public void TestInValidDateInputCase() throws ParseException + { + regex.beginWindow(0); + String line = "2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717]" + + " qwerty 101 [email protected] ip_address=1.1.1.1 service_id=IP1234-NPB12345_00" + + " result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf account_id=11111 platform=pik"; + regex.in.process(line.getBytes()); + regex.endWindow(); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + KeyValPair<String, String> obj = (KeyValPair<String, String>)error.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals("2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717]" + + " qwerty 101 [email protected] ip_address=1.1.1.1 service_id=IP1234-NPB12345_00 " + + "result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf account_id=11111 platform=pik" + , obj.getKey()); + Assert.assertEquals("The incoming tuple do not match with the Regex pattern defined.", obj.getValue()); + Assert.assertEquals(1, regex.getIncomingTuplesCount()); + Assert.assertEquals(0, regex.getEmittedObjectCount()); + Assert.assertEquals(1, regex.getErrorTupleCount()); + } + + @Test + public void TestInValidIntInputCase() throws ParseException + { + regex.beginWindow(0); + String line = "2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717] " + + "2015:10:01:03:14:46 hskhhskfk [email protected] ip_address=1.1.1.1 service_id=IP1234-NPB12345_00 " + + "result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf account_id=11111 platform=pik"; + regex.in.process(line.getBytes()); + regex.endWindow(); + KeyValPair<String, String> obj = (KeyValPair<String, String>)error.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals("2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717] " + + "2015:10:01:03:14:46 hskhhskfk [email protected] ip_address=1.1.1.1 service_id=IP1234-NPB12345_00 " + + "result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf account_id=11111 platform=pik", + obj.getKey()); + Assert.assertEquals("The incoming tuple do not match with the Regex pattern defined.", obj.getValue()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + Assert.assertEquals(1, regex.getIncomingTuplesCount()); + Assert.assertEquals(0, regex.getEmittedObjectCount()); + Assert.assertEquals(1, regex.getErrorTupleCount()); + } + + @Test + public void testNullInput() throws JSONException + { + regex.beginWindow(0); + regex.in.process(null); + regex.endWindow(); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + Assert.assertEquals(1, regex.getIncomingTuplesCount()); + Assert.assertEquals(1, regex.getErrorTupleCount()); + } + + @Test + public void testParserValidInputMetricVerification() + { + regex.beginWindow(0); + Assert.assertEquals(0, regex.getIncomingTuplesCount()); + Assert.assertEquals(0, regex.getErrorTupleCount()); + Assert.assertEquals(0, regex.getEmittedObjectCount()); + String tuple = "2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717] " + + "2015:10:01:03:14:49 101 [email protected] ip_address=1.1.1.1 service_id=IP1234-NPB12345_00 " + + "result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf account_id=11111 platform=pik"; + regex.in.process(tuple.getBytes()); + regex.endWindow(); + Assert.assertEquals(1, regex.getIncomingTuplesCount()); + Assert.assertEquals(0, regex.getErrorTupleCount()); + Assert.assertEquals(1, regex.getEmittedObjectCount()); + } + + @Test + public void testParserInvalidInputMetricVerification() + { + regex.beginWindow(0); + Assert.assertEquals(0, regex.getIncomingTuplesCount()); + Assert.assertEquals(0, regex.getErrorTupleCount()); + Assert.assertEquals(0, regex.getEmittedObjectCount()); + String tuple = "{" + "\"id\": 2" + "}"; + regex.in.process(tuple.getBytes()); + regex.endWindow(); + Assert.assertEquals(1, regex.getIncomingTuplesCount()); + Assert.assertEquals(1, regex.getErrorTupleCount()); + Assert.assertEquals(0, regex.getEmittedObjectCount()); + } + + @Test + public void testParserMetricResetVerification() + { + Assert.assertEquals(0, regex.getIncomingTuplesCount()); + Assert.assertEquals(0, regex.getErrorTupleCount()); + Assert.assertEquals(0, regex.getEmittedObjectCount()); + String tuple = "2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717] " + + "2015:10:01:03:14:49 101 [email protected] ip_address=1.1.1.1 service_id=IP1234-NPB12345_00 " + + "result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf account_id=11111 platform=pik"; + regex.beginWindow(0); + regex.in.process(tuple.getBytes()); + regex.endWindow(); + Assert.assertEquals(1, regex.getIncomingTuplesCount()); + Assert.assertEquals(0, regex.getErrorTupleCount()); + Assert.assertEquals(1, regex.getEmittedObjectCount()); + regex.beginWindow(1); + Assert.assertEquals(0, regex.getIncomingTuplesCount()); + Assert.assertEquals(0, regex.getErrorTupleCount()); + Assert.assertEquals(0, regex.getEmittedObjectCount()); + regex.in.process(tuple.getBytes()); + Assert.assertEquals(1, regex.getIncomingTuplesCount()); + Assert.assertEquals(0, regex.getErrorTupleCount()); + Assert.assertEquals(1, regex.getEmittedObjectCount()); + regex.endWindow(); + } + + public static class ServerLog + { + private String message; + private Date date; + private int id; + + public String getMessage() + { + return message; + } + + public void setMessage(String message) + { + this.message = message; + } + + public int getId() + { + return id; + } + + public void setId(int id) + { + this.id = id; + } + + public Date getDate() + { + return date; + } + + public void setDate(Date date) + { + this.date = date; + } + + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ffba8a22/contrib/src/test/resources/RegexSplitterschema.json ---------------------------------------------------------------------- diff --git a/contrib/src/test/resources/RegexSplitterschema.json b/contrib/src/test/resources/RegexSplitterschema.json new file mode 100644 index 0000000..e62b6c4 --- /dev/null +++ b/contrib/src/test/resources/RegexSplitterschema.json @@ -0,0 +1,20 @@ +{ + "fields": [ + { + "name": "date", + "type": "Date", + "constraints": { + "format": "yyyy:MM:dd:hh:mm:ss" + } + }, + { + "name": "id", + "type": "Integer" + }, + { + "name": "message", + "type": "String" + + } + ] +} \ No newline at end of file
