Repository: apex-malhar Updated Branches: refs/heads/master a25b6140c -> 9db044741
APEXMALHAR-2304 SQL Support Examples Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9db04474 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9db04474 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9db04474 Branch: refs/heads/master Commit: 9db044741a32d466c2df4d1582ca707fcce47402 Parents: a25b614 Author: Chinmay Kolhatkar <[email protected]> Authored: Mon Oct 24 11:41:04 2016 +0530 Committer: Chinmay Kolhatkar <[email protected]> Committed: Tue Nov 15 16:15:30 2016 +0530 ---------------------------------------------------------------------- demos/pom.xml | 1 + demos/sql/pom.xml | 102 ++++++++++++ demos/sql/src/assemble/appPackage.xml | 59 +++++++ .../sql/sample/FusionStyleSQLApplication.java | 88 +++++++++++ .../sql/sample/PureStyleSQLApplication.java | 65 ++++++++ .../sql/sample/SQLApplicationWithAPI.java | 45 ++++++ .../sql/sample/SQLApplicationWithModelFile.java | 50 ++++++ .../properties-FusionStyleSQLApplication.xml | 65 ++++++++ .../properties-PureStyleSQLApplication.xml | 65 ++++++++ .../properties-SQLApplicationWithAPI.xml | 43 +++++ .../properties-SQLApplicationWithModelFile.xml | 32 ++++ .../src/main/resources/META-INF/properties.xml | 41 +++++ .../main/resources/model/model_file_csv.json | 27 ++++ .../sample/FusionStyleSQLApplicationTest.java | 121 +++++++++++++++ .../sql/sample/PureStyleSQLApplicationTest.java | 155 +++++++++++++++++++ .../sql/sample/SQLApplicationWithAPITest.java | 92 +++++++++++ .../sample/SQLApplicationWithModelFileTest.java | 113 ++++++++++++++ demos/sql/src/test/resources/input.csv | 6 + demos/sql/src/test/resources/log4j.properties | 50 ++++++ sql/pom.xml | 1 - 20 files changed, 1220 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/pom.xml ---------------------------------------------------------------------- diff --git a/demos/pom.xml b/demos/pom.xml index 185bb54..c4fda7a 100644 --- a/demos/pom.xml +++ b/demos/pom.xml @@ -168,6 +168,7 @@ <modules> <module>distributedistinct</module> <module>highlevelapi</module> + <module>sql</module> </modules> </profile> </profiles> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/pom.xml ---------------------------------------------------------------------- diff --git a/demos/sql/pom.xml b/demos/sql/pom.xml new file mode 100644 index 0000000..fd00f58 --- /dev/null +++ b/demos/sql/pom.xml @@ -0,0 +1,102 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>sql-demo</artifactId> + <packaging>jar</packaging> + + <name>Apache Apex Malhar SQL API Demo</name> + <description>Apex demo applications that use SQL APIs to construct a DAG</description> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-demos</artifactId> + <version>3.6.0-SNAPSHOT</version> + </parent> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.9.1</version> + <executions> + <execution> + <id>attach-artifacts</id> + <phase>package</phase> + <goals> + <goal>attach-artifact</goal> + </goals> + <configuration> + <artifacts> + <artifact> + <file>target/${project.artifactId}-${project.version}.apa</file> + <type>apa</type> + </artifact> + </artifacts> + <skipAttach>false</skipAttach> + </configuration> + </execution> + </executions> + </plugin> + + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>apex-engine</artifactId> + <version>${apex.core.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-sql</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- For KafkaTest --> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-kafka</artifactId> + <version>${project.parent.version}</version> + <type>test-jar</type> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>0.9.0.0</version> + <classifier>test</classifier> + <scope>test</scope> + </dependency> + + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/demos/sql/src/assemble/appPackage.xml b/demos/sql/src/assemble/appPackage.xml new file mode 100644 index 0000000..4138cf2 --- /dev/null +++ b/demos/sql/src/assemble/appPackage.xml @@ -0,0 +1,59 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>appPackage</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/target/</directory> + <outputDirectory>/app</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/target/deps</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/site/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>*.xml</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/META-INF</directory> + <outputDirectory>/META-INF</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> + </fileSets> + +</assembly> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java ---------------------------------------------------------------------- diff --git a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java new file mode 100644 index 0000000..94b02db --- /dev/null +++ b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.sample; + +import java.util.Date; +import java.util.Map; + +import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator; +import org.apache.apex.malhar.sql.SQLExecEnvironment; +import org.apache.apex.malhar.sql.table.CSVMessageFormat; +import org.apache.apex.malhar.sql.table.FileEndpoint; +import org.apache.apex.malhar.sql.table.StreamEndpoint; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.ImmutableMap; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.contrib.parser.CsvParser; + + +@ApplicationAnnotation(name = "FusionStyleSQLApplication") +public class FusionStyleSQLApplication implements StreamingApplication +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + SQLExecEnvironment env = SQLExecEnvironment.getEnvironment(); + env.registerFunction("APEXCONCAT", PureStyleSQLApplication.class, "apex_concat_str"); + + Map<String, Class> fieldMapping = ImmutableMap.<String, Class>of( + "RowTime", Date.class, + "id", Integer.class, + "Product", String.class, + "units", Integer.class); + + // Add Kafka Input + KafkaSinglePortInputOperator kafkaInput = dag.addOperator("KafkaInput", KafkaSinglePortInputOperator.class); + kafkaInput.setInitialOffset("EARLIEST"); + + // Add CSVParser + CsvParser csvParser = dag.addOperator("CSVParser", CsvParser.class); + dag.addStream("KafkaToCSV", kafkaInput.outputPort, csvParser.in); + + // Register CSV Parser output as input table for first SQL + env.registerTable(conf.get("sqlSchemaInputName"), new StreamEndpoint(csvParser.out, fieldMapping)); + + // Register FileEndpoint as output table for second SQL. + env.registerTable(conf.get("sqlSchemaOutputName"), new FileEndpoint(conf.get("folderPath"), + conf.get("fileName"), new CSVMessageFormat(conf.get("sqlSchemaOutputDef")))); + + // Add second SQL to DAG + env.executeSQL(dag, conf.get("sql")); + } + + public static class PassThroughOperator extends BaseOperator + { + public final transient DefaultOutputPort output = new DefaultOutputPort(); + public final transient DefaultInputPort input = new DefaultInputPort() + { + @Override + public void process(Object o) + { + output.emit(output); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java ---------------------------------------------------------------------- diff --git a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java new file mode 100644 index 0000000..9a727a3 --- /dev/null +++ b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.sample; + +import org.apache.apex.malhar.sql.SQLExecEnvironment; +import org.apache.apex.malhar.sql.table.CSVMessageFormat; +import org.apache.apex.malhar.sql.table.FileEndpoint; +import org.apache.apex.malhar.sql.table.KafkaEndpoint; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +@ApplicationAnnotation(name = "PureStyleSQLApplication") +public class PureStyleSQLApplication implements StreamingApplication +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + // Source definition + String schemaInName = conf.get("schemaInName"); + String schemaInDef = conf.get("schemaInDef"); + String broker = conf.get("broker"); + String sourceTopic = conf.get("topic"); + + // Destination definition + String schemaOutName = conf.get("schemaOutName"); + String schemaOutDef = conf.get("schemaOutDef"); + String outputFolder = conf.get("outputFolder"); + String outFilename = conf.get("destFileName"); + + // SQL statement + String sql = conf.get("sql"); + + SQLExecEnvironment.getEnvironment() + .registerTable(schemaInName, new KafkaEndpoint(broker, sourceTopic, + new CSVMessageFormat(schemaInDef))) + .registerTable(schemaOutName, new FileEndpoint(outputFolder, outFilename, + new CSVMessageFormat(schemaOutDef))) + .registerFunction("APEXCONCAT", this.getClass(), "apex_concat_str") + .executeSQL(dag, sql); + } + + public static String apex_concat_str(String s1, String s2) + { + return s1 + s2; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java ---------------------------------------------------------------------- diff --git a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java new file mode 100644 index 0000000..604332b --- /dev/null +++ b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.sample; + +import org.apache.apex.malhar.sql.SQLExecEnvironment; +import org.apache.apex.malhar.sql.table.CSVMessageFormat; +import org.apache.apex.malhar.sql.table.FileEndpoint; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +@ApplicationAnnotation(name = "SQLApplicationWithAPI") +public class SQLApplicationWithAPI implements StreamingApplication +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + // Source definition + String schemaInName = conf.get("csvSchemaInName"); + String schemaIn = conf.get("csvSchemaIn"); + String sourceFile = conf.get("sourceFile"); + + SQLExecEnvironment.getEnvironment() + .registerTable(schemaInName, new FileEndpoint(sourceFile, new CSVMessageFormat(schemaIn))) + .executeSQL(dag, conf.get("sql")); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java ---------------------------------------------------------------------- diff --git a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java new file mode 100644 index 0000000..2d22b18 --- /dev/null +++ b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.sample; + +import java.io.File; +import java.io.IOException; + +import org.apache.apex.malhar.sql.SQLExecEnvironment; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +@ApplicationAnnotation(name = "SQLApplicationWithModelFile") +public class SQLApplicationWithModelFile implements StreamingApplication +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + String modelFile = conf.get("modelFile"); + String model; + try { + model = FileUtils.readFileToString(new File(modelFile)); + } catch (IOException e) { + throw new RuntimeException(e); + } + + SQLExecEnvironment.getEnvironment() + .withModel(model) + .executeSQL(dag, conf.get("sql")); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml ---------------------------------------------------------------------- diff --git a/demos/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml b/demos/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml new file mode 100644 index 0000000..77852e7 --- /dev/null +++ b/demos/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml @@ -0,0 +1,65 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <!-- Kafka Operator Properties --> + <property> + <name>dt.operator.KafkaInput.prop.topics</name> + <value>dataTopic</value> + </property> + <property> + <name>dt.operator.KafkaInput.prop.clusters</name> + <value>localhost:9092</value> <!-- broker (NOT zookeeper) address --> + </property> + + <!-- CSV Parser Properties --> + <property> + <name>dt.operator.CSVParser.prop.schema</name> + <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}</value> + </property> + + <!-- SQL Properties --> + <property> + <name>sqlSchemaInputName</name> + <value>FROMCSV</value> + </property> + <property> + <name>sqlSchemaOutputName</name> + <value>TOFILE</value> + </property> + <property> + <name>folderPath</name> + <value>/tmp/output</value> + </property> + <property> + <name>fileName</name> + <value>output.txt</value> + </property> + <property> + <name>sqlSchemaOutputDef</name> + <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime1","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"RowTime2","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"Product","type":"String"}]}</value> + </property> + <property> + <name>sql</name> + <value>INSERT INTO TOFILE SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) FROM FROMCSV WHERE ID > 3 AND PRODUCT LIKE 'paint%'</value> + </property> +</configuration> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml ---------------------------------------------------------------------- diff --git a/demos/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml b/demos/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml new file mode 100644 index 0000000..0d25aa6 --- /dev/null +++ b/demos/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml @@ -0,0 +1,65 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <!-- Input Definition --> + <property> + <name>schemaInName</name> + <value>ORDERS</value> + </property> + <property> + <name>schemaInDef</name> + <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}</value> + </property> + <property> + <name>broker</name> + <value>localhost:9090</value> + </property> + <property> + <name>topic</name> + <value>inputTopic</value> + </property> + + <!-- Output Definition --> + <property> + <name>schemaOutName</name> + <value>SALES</value> + </property> + <property> + <name>schemaOutDef</name> + <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime1","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"RowTime2","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"Product","type":"String"}]}</value> + </property> + <property> + <name>outputFolder</name> + <value>/tmp/output</value> + </property> + <property> + <name>destFileName</name> + <value>out.file</value> + </property> + + <!-- Execution SQL --> + <property> + <name>sql</name> + <value>INSERT INTO SALES SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) FROM ORDERS WHERE ID > 3 AND PRODUCT LIKE 'paint%'</value> + </property> +</configuration> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml ---------------------------------------------------------------------- diff --git a/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml b/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml new file mode 100644 index 0000000..9ac49d4 --- /dev/null +++ b/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml @@ -0,0 +1,43 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <!-- Input Definition --> + <property> + <name>csvSchemaInName</name> + <value>ORDERS</value> + </property> + <property> + <name>csvSchemaIn</name> + <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}</value> + </property> + <property> + <name>sourceFile</name> + <value>src/test/resources/input.csv</value> + </property> + + <!-- Execution SQL --> + <property> + <name>sql</name> + <value>SELECT STREAM ROWTIME, PRODUCT FROM ORDERS</value> + </property> +</configuration> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml ---------------------------------------------------------------------- diff --git a/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml b/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml new file mode 100644 index 0000000..ab026c2 --- /dev/null +++ b/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml @@ -0,0 +1,32 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <property> + <name>modelFile</name> + <value>src/main/resources/model/model_file_csv.json</value> + </property> + <property> + <name>sql</name> + <value>SELECT STREAM ROWTIME, PRODUCT FROM ORDERS</value> + </property> +</configuration> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/demos/sql/src/main/resources/META-INF/properties.xml b/demos/sql/src/main/resources/META-INF/properties.xml new file mode 100644 index 0000000..6080bf6 --- /dev/null +++ b/demos/sql/src/main/resources/META-INF/properties.xml @@ -0,0 +1,41 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <!-- Memory settings for all demos --> + <property> + <name>dt.attr.MASTER_MEMORY_MB</name> + <value>512</value> + </property> + <property> + <name>dt.application.*.operator.*.attr.MEMORY_MB</name> + <value>256</value> + </property> + <property> + <name>dt.application.*.operator.*.attr.JVM_OPTIONS</name> + <value>-Xmx128M</value> + </property> + <property> + <name>dt.application.*.operator.*.port.*.attr.BUFFER_MEMORY_MB</name> + <value>128</value> + </property> +</configuration> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/resources/model/model_file_csv.json ---------------------------------------------------------------------- diff --git a/demos/sql/src/main/resources/model/model_file_csv.json b/demos/sql/src/main/resources/model/model_file_csv.json new file mode 100644 index 0000000..beba18d --- /dev/null +++ b/demos/sql/src/main/resources/model/model_file_csv.json @@ -0,0 +1,27 @@ +{ + "version": "1.0", + "defaultSchema": "APEX", + "schemas": [{ + "name": "APEX", + "tables": [ + { + "name": "ORDERS", + "type": "custom", + "factory": "org.apache.apex.malhar.sql.schema.ApexSQLTableFactory", + "stream": { + "stream": true + }, + "operand": { + "endpoint": "file", + "messageFormat": "csv", + "endpointOperands": { + "directory": "src/test/resources/input.csv" + }, + "messageFormatOperands": { + "schema": "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss\"}},{\"name\":\"id\",\"type\":\"Integer\"},{\"name\":\"Product\",\"type\":\"String\"},{\"name\":\"units\",\"type\":\"Integer\"}]}" + } + } + } + ] + }] +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java new file mode 100644 index 0000000..7208701 --- /dev/null +++ b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.sample; + +import java.io.File; +import java.util.Arrays; +import java.util.List; +import java.util.TimeZone; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import org.apache.apex.malhar.kafka.EmbeddedKafka; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +public class FusionStyleSQLApplicationTest +{ + private final String testTopicData = "dataTopic"; + private final String testTopicResult = "resultTopic"; + + private TimeZone defaultTZ; + private EmbeddedKafka kafka; + + private static String outputFolder = "target/output/"; + + @Rule + public TestName testName = new TestName(); + + @Before + public void setUp() throws Exception + { + defaultTZ = TimeZone.getDefault(); + TimeZone.setDefault(TimeZone.getTimeZone("GMT")); + + kafka = new EmbeddedKafka(); + kafka.start(); + kafka.createTopic(testTopicData); + kafka.createTopic(testTopicResult); + + outputFolder += testName.getMethodName() + "/"; + } + + @After + public void tearDown() throws Exception + { + kafka.stop(); + + TimeZone.setDefault(defaultTZ); + } + + @Test + public void test() throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-FusionStyleSQLApplication.xml")); + + conf.set("dt.operator.KafkaInput.prop.topics", testTopicData); + conf.set("dt.operator.KafkaInput.prop.clusters", kafka.getBroker()); + conf.set("folderPath", outputFolder); + conf.set("fileName", "out.tmp"); + + FusionStyleSQLApplication app = new FusionStyleSQLApplication(); + + lma.prepareDAG(app, conf); + + LocalMode.Controller lc = lma.getController(); + + lc.runAsync(); + kafka.publish(testTopicData, Arrays.asList("15/02/2016 10:15:00 +0000,1,paint1,11", + "15/02/2016 10:16:00 +0000,2,paint2,12", + "15/02/2016 10:17:00 +0000,3,paint3,13", "15/02/2016 10:18:00 +0000,4,paint4,14", + "15/02/2016 10:19:00 +0000,5,paint5,15", "15/02/2016 10:10:00 +0000,6,abcde6,16")); + + Assert.assertTrue(PureStyleSQLApplicationTest.waitTillFileIsPopulated(outputFolder, 40000)); + lc.shutdown(); + + File file = new File(outputFolder); + File file1 = new File(outputFolder + file.list()[0]); + List<String> strings = FileUtils.readLines(file1); + + String[] actualLines = strings.toArray(new String[strings.size()]); + String[] expectedLines = new String[] { + "15/02/2016 10:18:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT4", + "", + "15/02/2016 10:19:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT5", + ""}; + Assert.assertEquals(expectedLines.length, actualLines.length); + for (int i = 0; i < actualLines.length; i++) { + Assert.assertEquals(expectedLines[i], actualLines[i]); + } + } catch (Exception e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java new file mode 100644 index 0000000..f298059 --- /dev/null +++ b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.sample; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.TimeZone; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import org.apache.apex.malhar.kafka.EmbeddedKafka; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.LocalMode; + + +public class PureStyleSQLApplicationTest +{ + private final String testTopicData = "dataTopic"; + private final String testTopicResult = "resultTopic"; + + private TimeZone defaultTZ; + private EmbeddedKafka kafka; + private static String outputFolder = "target/output/"; + + @Rule + public TestName testName = new TestName(); + + @Before + public void setUp() throws Exception + { + defaultTZ = TimeZone.getDefault(); + TimeZone.setDefault(TimeZone.getTimeZone("GMT")); + + kafka = new EmbeddedKafka(); + kafka.start(); + kafka.createTopic(testTopicData); + kafka.createTopic(testTopicResult); + + outputFolder += testName.getMethodName() + "/"; + } + + @After + public void tearDown() throws Exception + { + kafka.stop(); + TimeZone.setDefault(defaultTZ); + } + + @Test + public void test() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-PureStyleSQLApplication.xml")); + + conf.set("broker", kafka.getBroker()); + conf.set("topic", testTopicData); + conf.set("outputFolder", outputFolder); + conf.set("destFileName", "out.tmp"); + + PureStyleSQLApplication app = new PureStyleSQLApplication(); + + lma.prepareDAG(app, conf); + + LocalMode.Controller lc = lma.getController(); + + lc.runAsync(); + kafka.publish(testTopicData, Arrays.asList( + "15/02/2016 10:15:00 +0000,1,paint1,11", + "15/02/2016 10:16:00 +0000,2,paint2,12", + "15/02/2016 10:17:00 +0000,3,paint3,13", + "15/02/2016 10:18:00 +0000,4,paint4,14", + "15/02/2016 10:19:00 +0000,5,paint5,15", + "15/02/2016 10:10:00 +0000,6,abcde6,16")); + + Assert.assertTrue(waitTillFileIsPopulated(outputFolder, 40000)); + lc.shutdown(); + + File file = new File(outputFolder); + File file1 = new File(outputFolder + file.list()[0]); + List<String> strings = FileUtils.readLines(file1); + + String[] actualLines = strings.toArray(new String[strings.size()]); + + String[] expectedLines = new String[]{ + "15/02/2016 10:18:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT4", + "", + "15/02/2016 10:19:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT5", + ""}; + + Assert.assertEquals(expectedLines.length, actualLines.length); + for (int i = 0;i < expectedLines.length; i++) { + Assert.assertEquals(expectedLines[i], actualLines[i]); + } + } + + public static boolean waitTillFileIsPopulated(String outputFolder, int timeout) throws IOException, InterruptedException + { + boolean result; + long now = System.currentTimeMillis(); + Path outDir = new Path("file://" + new File(outputFolder).getAbsolutePath()); + try (FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration())) { + List<String> strings = Lists.newArrayList(); + while (System.currentTimeMillis() - now < timeout) { + if (fs.exists(outDir)) { + File file = new File(outputFolder); + if (file.list().length > 0) { + File file1 = new File(outputFolder + file.list()[0]); + strings = FileUtils.readLines(file1); + if (strings.size() != 0) { + break; + } + } + } + + Thread.sleep(500); + } + + result = fs.exists(outDir) && (strings.size() != 0); + } + + return result; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java ---------------------------------------------------------------------- diff --git a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java new file mode 100644 index 0000000..6b1a404 --- /dev/null +++ b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.sample; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.Arrays; +import java.util.Collection; +import java.util.TimeZone; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Predicates; +import com.google.common.collect.Collections2; + +import com.datatorrent.api.LocalMode; + + +public class SQLApplicationWithAPITest +{ + private TimeZone defaultTZ; + + @Before + public void setUp() throws Exception + { + defaultTZ = TimeZone.getDefault(); + TimeZone.setDefault(TimeZone.getTimeZone("GMT")); + } + + @After + public void tearDown() throws Exception + { + TimeZone.setDefault(defaultTZ); + } + + @Test + public void test() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SQLApplicationWithAPI.xml")); + + SQLApplicationWithAPI app = new SQLApplicationWithAPI(); + + lma.prepareDAG(app, conf); + + LocalMode.Controller lc = lma.getController(); + + PrintStream originalSysout = System.out; + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + System.setOut(new PrintStream(baos)); + + lc.runAsync(); + SQLApplicationWithModelFileTest.waitTillStdoutIsPopulated(baos, 30000); + lc.shutdown(); + + System.setOut(originalSysout); + + String[] sout = baos.toString().split(System.lineSeparator()); + Collection<String> filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:")); + + String[] actualLines = filter.toArray(new String[filter.size()]); + Assert.assertTrue(actualLines[0].contains("RowTime=Mon Feb 15 10:15:00 GMT 2016, Product=paint1")); + Assert.assertTrue(actualLines[1].contains("RowTime=Mon Feb 15 10:16:00 GMT 2016, Product=paint2")); + Assert.assertTrue(actualLines[2].contains("RowTime=Mon Feb 15 10:17:00 GMT 2016, Product=paint3")); + Assert.assertTrue(actualLines[3].contains("RowTime=Mon Feb 15 10:18:00 GMT 2016, Product=paint4")); + Assert.assertTrue(actualLines[4].contains("RowTime=Mon Feb 15 10:19:00 GMT 2016, Product=paint5")); + Assert.assertTrue(actualLines[5].contains("RowTime=Mon Feb 15 10:10:00 GMT 2016, Product=abcde6")); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java ---------------------------------------------------------------------- diff --git a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java new file mode 100644 index 0000000..7bbb8ec --- /dev/null +++ b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.sample; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.Arrays; +import java.util.Collection; +import java.util.TimeZone; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Predicates; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; + +import com.datatorrent.api.LocalMode; + +public class SQLApplicationWithModelFileTest +{ + private TimeZone defaultTZ; + + @Before + public void setUp() throws Exception + { + defaultTZ = TimeZone.getDefault(); + TimeZone.setDefault(TimeZone.getTimeZone("GMT")); + } + + @After + public void tearDown() throws Exception + { + TimeZone.setDefault(defaultTZ); + } + + @Test + public void test() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SQLApplicationWithModelFile.xml")); + + SQLApplicationWithModelFile app = new SQLApplicationWithModelFile(); + + lma.prepareDAG(app, conf); + + LocalMode.Controller lc = lma.getController(); + + PrintStream originalSysout = System.out; + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + System.setOut(new PrintStream(baos)); + + lc.runAsync(); + waitTillStdoutIsPopulated(baos, 30000); + lc.shutdown(); + + System.setOut(originalSysout); + + String[] sout = baos.toString().split(System.lineSeparator()); + Collection<String> filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:")); + + String[] actualLines = filter.toArray(new String[filter.size()]); + Assert.assertTrue(actualLines[0].contains("RowTime=Mon Feb 15 10:15:00 GMT 2016, Product=paint1")); + Assert.assertTrue(actualLines[1].contains("RowTime=Mon Feb 15 10:16:00 GMT 2016, Product=paint2")); + Assert.assertTrue(actualLines[2].contains("RowTime=Mon Feb 15 10:17:00 GMT 2016, Product=paint3")); + Assert.assertTrue(actualLines[3].contains("RowTime=Mon Feb 15 10:18:00 GMT 2016, Product=paint4")); + Assert.assertTrue(actualLines[4].contains("RowTime=Mon Feb 15 10:19:00 GMT 2016, Product=paint5")); + Assert.assertTrue(actualLines[5].contains("RowTime=Mon Feb 15 10:10:00 GMT 2016, Product=abcde6")); + } + + public static boolean waitTillStdoutIsPopulated(ByteArrayOutputStream baos, int timeout) throws InterruptedException, + IOException + { + long now = System.currentTimeMillis(); + Collection<String> filter = Lists.newArrayList(); + while (System.currentTimeMillis() - now < timeout) { + baos.flush(); + String[] sout = baos.toString().split(System.lineSeparator()); + filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:")); + if (filter.size() != 0) { + break; + } + + Thread.sleep(500); + } + + return (filter.size() != 0); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/test/resources/input.csv ---------------------------------------------------------------------- diff --git a/demos/sql/src/test/resources/input.csv b/demos/sql/src/test/resources/input.csv new file mode 100644 index 0000000..c4786d1 --- /dev/null +++ b/demos/sql/src/test/resources/input.csv @@ -0,0 +1,6 @@ +15/02/2016 10:15:00 +0000,1,paint1,11 +15/02/2016 10:16:00 +0000,2,paint2,12 +15/02/2016 10:17:00 +0000,3,paint3,13 +15/02/2016 10:18:00 +0000,4,paint4,14 +15/02/2016 10:19:00 +0000,5,paint5,15 +15/02/2016 10:10:00 +0000,6,abcde6,16 http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/demos/sql/src/test/resources/log4j.properties b/demos/sql/src/test/resources/log4j.properties new file mode 100644 index 0000000..8ea3cfe --- /dev/null +++ b/demos/sql/src/test/resources/log4j.properties @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.CONSOLE.threshold=WARN +test.log.console.threshold=WARN + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=INFO +log4j.logger.org.apache.apex=INFO + +log4j.logger.org.apache.calcite=WARN +log4j.logger.org.apache.kafka=WARN +log4j.logger.org.I0Itec.zkclient.ZkClient=WARN +log4j.logger.org.apache.zookeeper=WARN +log4j.logger.kafka=WARN +log4j.logger.kafka.consumer=WARN http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/sql/pom.xml ---------------------------------------------------------------------- diff --git a/sql/pom.xml b/sql/pom.xml index 24ef44c..c5d96c5 100644 --- a/sql/pom.xml +++ b/sql/pom.xml @@ -161,7 +161,6 @@ <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.9.0.0</version> - <optional>true</optional> <exclusions> <exclusion> <groupId>org.slf4j</groupId>
