Repository: apex-malhar Updated Branches: refs/heads/master b8ca9d63f -> 2fe2903bf
APEXMALHAR-2479-regexparser-example Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2fe2903b Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2fe2903b Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2fe2903b Branch: refs/heads/master Commit: 2fe2903bfe65055c7fd361a97549b6e7e12d4289 Parents: b8ca9d6 Author: venkateshDT <[email protected]> Authored: Wed Apr 19 00:24:52 2017 -0700 Committer: venkateshDT <[email protected]> Committed: Tue Apr 25 17:10:15 2017 -0700 ---------------------------------------------------------------------- examples/parser/README.md | 21 +++- .../parser/regexparser/FileOutputOperator.java | 52 +++++++++ .../regexparser/RegexParserApplication.java | 46 ++++++++ .../examples/parser/regexparser/ServerLog.java | 116 +++++++++++++++++++ .../parser/regexparser/ServerLogGenerator.java | 64 ++++++++++ .../regexparser/RegexParserApplicationTest.java | 94 +++++++++++++++ .../parser/src/test/resources/log4j.properties | 2 +- .../properties-regexParserApplication.xml | 88 ++++++++++++++ 8 files changed, 480 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fe2903b/examples/parser/README.md ---------------------------------------------------------------------- diff --git a/examples/parser/README.md b/examples/parser/README.md index 51052b9..b7f930a 100644 --- a/examples/parser/README.md +++ b/examples/parser/README.md @@ -5,7 +5,7 @@ This app showcases **Json Parser**. Data generator sends Json data to the Json P * **Csv Parser App** -This application showcases how to use CsvParser from [Apex Malhar](https://github.com/apache/apex-malhar) library. The CsvParser converts your delimited data to a key-value pair map or concrete java class also know as [POJO](https://en.wikipedia.org/wiki/Plain_Old_Java_Object). The parser emits key-value pair map on *parsedOutput* port. It emits POJO on *out* and error records on *err* port. +This application showcases how to use [CsvParser](https://datatorrent.com/docs/apidocs/com/datatorrent/contrib/parser/CsvParser.html) from [Apex Malhar](https://github.com/apache/apex-malhar) library. The CsvParser converts your delimited data to a key-value pair map or concrete java class also know as [POJO](https://en.wikipedia.org/wiki/Plain_Old_Java_Object). The parser emits key-value pair map on *parsedOutput* port. It emits POJO on *out* and error records on *err* port. Follow these steps to run this application: @@ -21,7 +21,8 @@ that the output by checking hdfs file path configured in properties-csvParseAppl * **Xml Parser App** -This application showcases how to use XmlParser from [Apex Malhar](https://github.com/apache/apex-malhar) library. The XmlParser Operator converts XML string to POJO. +This application showcases how to use [XmlParser](https://datatorrent.com/docs/apidocs/com/datatorrent/lib/parser/XmlParser.html) +from [Apex Malhar](https://github.com/apache/apex-malhar) library. The XmlParser Operator converts XML string to POJO. The parser emits dom based Document on *parsedOutput* port. It emits POJO on *out* and error records on *err* port. Follow these steps to run this application: @@ -34,3 +35,19 @@ the commandline using `apex` cli script. **Step 3**: During launch use `src/main/resources/META-INF/properties-xmlParseApplication.xml` as a custom configuration file; then verify that the output by checking hdfs file path configured in properties-xmlParseApplication.xml + +* **RegexParser App** + +This application showcases how to use [RegexParser](https://datatorrent.com/docs/apidocs/com/datatorrent/contrib/parser/RegexParser.html) from [Apex Malhar](https://github.com/apache/apex-malhar) library. + +Follow these steps to run this application: + +**Step 1**: Build the code: + + shell> mvn clean install + +**Step 2**: Upload the `target/parser-1.0-SNAPSHOT.apa` to the UI console if available or launch it from +the commandline using `apex` cli script. + +**Step 3**: During launch use `properties-regexParserApplication.xml` as a custom configuration file; then verify +that the output by checking hdfs file path configured in properties-regexParserApplication.xml \ No newline at end of file http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fe2903b/examples/parser/src/main/java/org/apache/apex/examples/parser/regexparser/FileOutputOperator.java ---------------------------------------------------------------------- diff --git a/examples/parser/src/main/java/org/apache/apex/examples/parser/regexparser/FileOutputOperator.java b/examples/parser/src/main/java/org/apache/apex/examples/parser/regexparser/FileOutputOperator.java new file mode 100644 index 0000000..b37d3f6 --- /dev/null +++ b/examples/parser/src/main/java/org/apache/apex/examples/parser/regexparser/FileOutputOperator.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.examples.parser.regexparser; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; + +public class FileOutputOperator extends AbstractFileOutputOperator<Object> +{ + @NotNull + private String outputFileName; + + @Override + protected String getFileName(Object tuple) + { + return outputFileName; + } + + @Override + protected byte[] getBytesForTuple(Object tuple) + { + return (tuple.toString() + "\n").getBytes(); + } + + public String getOutputFileName() + { + return outputFileName; + } + + public void setOutputFileName(String outputFileName) + { + this.outputFileName = outputFileName; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fe2903b/examples/parser/src/main/java/org/apache/apex/examples/parser/regexparser/RegexParserApplication.java ---------------------------------------------------------------------- diff --git a/examples/parser/src/main/java/org/apache/apex/examples/parser/regexparser/RegexParserApplication.java b/examples/parser/src/main/java/org/apache/apex/examples/parser/regexparser/RegexParserApplication.java new file mode 100644 index 0000000..3935793 --- /dev/null +++ b/examples/parser/src/main/java/org/apache/apex/examples/parser/regexparser/RegexParserApplication.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.parser.regexparser; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.contrib.parser.RegexParser; + +@ApplicationAnnotation(name = "RegexParser") +public class RegexParserApplication implements StreamingApplication +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + ServerLogGenerator logGenerator = dag.addOperator("logGenerator", ServerLogGenerator.class); + RegexParser regexParser = dag.addOperator("regexParser", RegexParser.class); + dag.setOutputPortAttribute(regexParser.out, Context.PortContext.TUPLE_CLASS, ServerLog.class); + + FileOutputOperator regexWriter = dag.addOperator("regexWriter", FileOutputOperator.class); + FileOutputOperator regexErrorWriter = dag.addOperator("regexErrorWriter", FileOutputOperator.class); + + dag.addStream("regexInput", logGenerator.outputPort, regexParser.in); + dag.addStream("regexOutput", regexParser.out, regexWriter.input); + dag.addStream("regexError", regexParser.err, regexErrorWriter.input); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fe2903b/examples/parser/src/main/java/org/apache/apex/examples/parser/regexparser/ServerLog.java ---------------------------------------------------------------------- diff --git a/examples/parser/src/main/java/org/apache/apex/examples/parser/regexparser/ServerLog.java b/examples/parser/src/main/java/org/apache/apex/examples/parser/regexparser/ServerLog.java new file mode 100644 index 0000000..b446f8d --- /dev/null +++ b/examples/parser/src/main/java/org/apache/apex/examples/parser/regexparser/ServerLog.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.parser.regexparser; + +import java.util.Date; + +public class ServerLog +{ + private Date date; + private int id; + private String signInId; + private String ipAddress; + private String serviceId; + private String accountId; + private String platform; + + public int getId() + { + return id; + } + + public void setId(int id) + { + this.id = id; + } + + public Date getDate() + { + return date; + } + + public void setDate(Date date) + { + this.date = date; + } + + public String getSignInId() + { + return signInId; + } + + public void setSignInId(String signInId) + { + this.signInId = signInId; + } + + public String getIpAddress() + { + return ipAddress; + } + + public void setIpAddress(String ipAddress) + { + this.ipAddress = ipAddress; + } + + public String getServiceId() + { + return serviceId; + } + + public void setServiceId(String serviceId) + { + this.serviceId = serviceId; + } + + public String getAccountId() + { + return accountId; + } + + public void setAccountId(String accountId) + { + this.accountId = accountId; + } + + public String getPlatform() + { + return platform; + } + + public void setPlatform(String platform) + { + this.platform = platform; + } + + @Override + public String toString() + { + return "ServerLog{" + + "date=" + date + + ", id=" + id + + ", signInId='" + signInId + '\'' + + ", ipAddress='" + ipAddress + '\'' + + ", serviceId='" + serviceId + '\'' + + ", accountId='" + accountId + '\'' + + ", platform='" + platform + '\'' + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fe2903b/examples/parser/src/main/java/org/apache/apex/examples/parser/regexparser/ServerLogGenerator.java ---------------------------------------------------------------------- diff --git a/examples/parser/src/main/java/org/apache/apex/examples/parser/regexparser/ServerLogGenerator.java b/examples/parser/src/main/java/org/apache/apex/examples/parser/regexparser/ServerLogGenerator.java new file mode 100644 index 0000000..9db1b51 --- /dev/null +++ b/examples/parser/src/main/java/org/apache/apex/examples/parser/regexparser/ServerLogGenerator.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.parser.regexparser; + +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; + +public class ServerLogGenerator extends BaseOperator implements InputOperator +{ + public transient DefaultOutputPort<byte[]> outputPort = new DefaultOutputPort<byte[]>(); + private int tupleRate = 10; + private transient int tuplesEmmitedinWindow = 0; + + public int getTupleRate() + { + return tupleRate; + } + + public void setTupleRate(int tupleRate) + { + this.tupleRate = tupleRate; + } + + @Override + public void emitTuples() + { + while (tuplesEmmitedinWindow < tupleRate) { + String line = "2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717]" + + " 2015:10:01:03:14:49 101 [email protected] ip_address=1.1.1.1 service_id=IP1234-NPB12345_00 " + + "result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf account_id=11111 platform=pik"; + outputPort.emit(line.getBytes()); + tuplesEmmitedinWindow++; + } + if (tuplesEmmitedinWindow == tupleRate) { + String errorLine = "This is error line that will be emitted on the output port"; + outputPort.emit(errorLine.getBytes()); + tuplesEmmitedinWindow++; + } + } + + @Override + public void endWindow() + { + tuplesEmmitedinWindow = 0; + super.endWindow(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fe2903b/examples/parser/src/test/java/org/apache/apex/examples/parser/regexparser/RegexParserApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/parser/src/test/java/org/apache/apex/examples/parser/regexparser/RegexParserApplicationTest.java b/examples/parser/src/test/java/org/apache/apex/examples/parser/regexparser/RegexParserApplicationTest.java new file mode 100644 index 0000000..e7accee --- /dev/null +++ b/examples/parser/src/test/java/org/apache/apex/examples/parser/regexparser/RegexParserApplicationTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.parser.regexparser; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.Callable; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.filefilter.WildcardFileFilter; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.stram.StramLocalCluster; + +public class RegexParserApplicationTest +{ + + @Test + public void testApplication() throws IOException, Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/properties-regexParserApplication.xml")); + conf.setLong("dt.application.RegexParser.operator.logGenerator.prop.tupleRate", 10); + final String dataFolderPath = conf.get("dt.application.RegexParser.operator.*.prop.filePath"); + final String dataFileName = conf + .get("dt.application.RegexParser.operator.regexWriter.prop.outputFileName"); + + FileUtils.deleteDirectory(new File(dataFolderPath)); + lma.prepareDAG(new RegexParserApplication(), conf); + LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(false); + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + if (new File(dataFolderPath).exists()) { + Collection<File> files = FileUtils.listFiles(new File(dataFolderPath), new WildcardFileFilter(dataFileName + + "*"), null); + if (files.size() >= 1) { + File parsedFile = files.iterator().next(); + String fileData = FileUtils.readFileToString(parsedFile); + String[] regexData = fileData.split("\n"); + return regexData.length == 10; + } + } + return false; + } + }); + + lc.run(30 * 1000); // runs for 30 seconds and quitxs + + Collection<File> files = FileUtils.listFiles(new File(dataFolderPath), + new WildcardFileFilter(dataFileName + "*"), null); + File parsedFile = files.iterator().next(); + String fileData = FileUtils.readFileToString(parsedFile); + String[] logData = fileData.split("\n"); + for (String logLine : logData) { + Assert.assertTrue(logLine.contains("id=" + 101)); + Assert.assertTrue(logLine.contains("signInId=" + "'[email protected]'")); + Assert.assertTrue(logLine.contains("serviceId=" + "'IP1234-NPB12345_00'")); + Assert.assertTrue(logLine.contains("accountId=" + "'11111'")); + Assert.assertTrue(logLine.contains("platform=" + "'pik'")); + } + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fe2903b/examples/parser/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/parser/src/test/resources/log4j.properties b/examples/parser/src/test/resources/log4j.properties index 41c733d..3776bbe 100644 --- a/examples/parser/src/test/resources/log4j.properties +++ b/examples/parser/src/test/resources/log4j.properties @@ -23,7 +23,7 @@ log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n log4j.appender.CONSOLE.threshold=${test.log.console.threshold} -test.log.console.threshold=INFO +test.log.console.threshold=WARN log4j.appender.RFA=org.apache.log4j.RollingFileAppender log4j.appender.RFA.layout=org.apache.log4j.PatternLayout http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fe2903b/examples/parser/src/test/resources/properties-regexParserApplication.xml ---------------------------------------------------------------------- diff --git a/examples/parser/src/test/resources/properties-regexParserApplication.xml b/examples/parser/src/test/resources/properties-regexParserApplication.xml new file mode 100644 index 0000000..f143ab0 --- /dev/null +++ b/examples/parser/src/test/resources/properties-regexParserApplication.xml @@ -0,0 +1,88 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + + <property> + <name>dt.application.RegexParser.operator.regexParser.prop.schema</name> + <value>{ + "fields": [ + { + "name": "date", + "type": "Date", + "constraints": { + "format": "yyyy:MM:dd:hh:mm:ss" + } + }, + { + "name": "id", + "type": "Integer" + }, + { + "name": "signInId", + "type": "String" + + }, + { + "name": "ipAddress", + "type": "String" + }, + { + "name": "serviceId", + "type": "String" + }, + { + "name": "accountId", + "type": "String" + }, + { + "name": "platform", + "type": "String" + } + ] + } + </value> + </property> + <property> + <name>dt.application.RegexParser.operator.regexParser.port.out.attr.TUPLE_CLASS</name> + <value>org.apache.apex.examples.parser.regexparser.ServerLog</value> + </property> + + <property> + <name>dt.application.RegexParser.operator.regexParser.prop.splitRegexPattern</name> + <value>.+\[SEQ=\w+\]\s*(\d+:[\d\d:]+)\s(\d+)\s* sign-in_id=(\S+) .*ip_address=(\S+).* service_id=(\S+).*account_id=(\S+).*platform=(\S+)</value> + </property> + + <property> + <name>dt.application.RegexParser.operator.*.prop.filePath</name> + <value>/tmp/application/parser/regexparser</value> + </property> + + <property> + <name>dt.application.RegexParser.operator.regexErrorWriter.prop.outputFileName</name> + <value>errordata</value> + </property> + + <property> + <name>dt.application.RegexParser.operator.regexWriter.prop.outputFileName</name> + <value>outputdata</value> + </property> +</configuration>
