Repository: apex-malhar Updated Branches: refs/heads/master a05980579 -> c92ca15e8
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/java/org/apache/apex/malhar/sql/SerDeTest.java ---------------------------------------------------------------------- diff --git a/sql/src/test/java/org/apache/apex/malhar/sql/SerDeTest.java b/sql/src/test/java/org/apache/apex/malhar/sql/SerDeTest.java new file mode 100644 index 0000000..62d2a4d --- /dev/null +++ b/sql/src/test/java/org/apache/apex/malhar/sql/SerDeTest.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +import org.junit.Test; + +import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator; +import org.apache.apex.malhar.kafka.KafkaSinglePortOutputOperator; +import org.apache.apex.malhar.sql.table.CSVMessageFormat; +import org.apache.apex.malhar.sql.table.Endpoint; +import org.apache.apex.malhar.sql.table.FileEndpoint; +import org.apache.apex.malhar.sql.table.KafkaEndpoint; +import org.apache.apex.malhar.sql.table.StreamEndpoint; + +import org.apache.commons.io.FileUtils; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; + +import com.datatorrent.contrib.formatter.CsvFormatter; +import com.datatorrent.contrib.parser.CsvParser; +import com.datatorrent.stram.plan.logical.LogicalPlan; + +public class SerDeTest +{ + @Test + public void testSQLWithApexFactory() throws IOException, ClassNotFoundException + { + File modelFile = new File("src/test/resources/model/model_file_csv.json"); + String model = FileUtils.readFileToString(modelFile); + + LogicalPlan dag = new LogicalPlan(); + SQLExecEnvironment.getEnvironment() + .withModel(model) + .executeSQL(dag, "SELECT STREAM ROWTIME, PRODUCT FROM ORDERS"); + + dag.validate(); + } + + @Test + public void testSQLWithAPI() throws ClassNotFoundException, IOException + { + LogicalPlan dag = new LogicalPlan(); + + String schema = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + + "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss Z\"}}," + + "{\"name\":\"id\",\"type\":\"Integer\"},{\"name\":\"Product\",\"type\":\"String\"}," + + "{\"name\":\"units\",\"type\":\"Integer\"}]}"; + + Endpoint endpoint = new FileEndpoint("dummyFilePath", new CSVMessageFormat(schema)); + + SQLExecEnvironment.getEnvironment() + .registerTable("ORDERS", endpoint) + .executeSQL(dag, "SELECT STREAM FLOOR(ROWTIME TO HOUR), SUBSTRING(PRODUCT, 0, 5) FROM ORDERS WHERE id > 3"); + + dag.validate(); + } + + @Test + public void testSQLSelectInsertWithAPI() throws IOException, ClassNotFoundException + { + LogicalPlan dag = new LogicalPlan(); + + String schemaIn = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + + "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss\"}}," + + "{\"name\":\"id\",\"type\":\"Integer\"}," + + "{\"name\":\"Product\",\"type\":\"String\"}," + + "{\"name\":\"units\",\"type\":\"Integer\"}]}"; + String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + + "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss\"}}," + + "{\"name\":\"Product\",\"type\":\"String\"}]}"; + + SQLExecEnvironment.getEnvironment() + .registerTable("ORDERS", new FileEndpoint("dummyFilePathInput", new CSVMessageFormat(schemaIn))) + .registerTable("SALES", new FileEndpoint("dummyFilePathOutput", "out.tmp", new CSVMessageFormat(schemaOut))) + .executeSQL(dag, "INSERT INTO SALES SELECT STREAM FLOOR(ROWTIME TO HOUR), SUBSTRING(PRODUCT, 0, 5) " + + "FROM ORDERS WHERE id > 3"); + + dag.validate(); + } + + @Test + public void testJoin() throws IOException, ClassNotFoundException + { + LogicalPlan dag = new LogicalPlan(); + String schemaIn0 = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + + "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss Z\"}}," + + "{\"name\":\"id\",\"type\":\"Integer\"}," + + "{\"name\":\"Product\",\"type\":\"String\"}," + + "{\"name\":\"units\",\"type\":\"Integer\"}]}"; + String schemaIn1 = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + + "{\"name\":\"id\",\"type\":\"Integer\"}," + + "{\"name\":\"Category\",\"type\":\"String\"}]}"; + String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + + "{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss Z\"}}," + + "{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss Z\"}}," + + "{\"name\":\"Product\",\"type\":\"String\"}," + + "{\"name\":\"Category\",\"type\":\"String\"}]}"; + + String sql = "INSERT INTO SALES SELECT STREAM A.ROWTIME, FLOOR(A.ROWTIME TO DAY), " + + "APEXCONCAT('OILPAINT', SUBSTRING(A.PRODUCT, 6, 7)), B.CATEGORY " + + "FROM ORDERS AS A " + + "JOIN CATEGORY AS B ON A.id = B.id " + + "WHERE A.id > 3 AND A.PRODUCT LIKE 'paint%'"; + + SQLExecEnvironment.getEnvironment() + .registerTable("ORDERS", new KafkaEndpoint("localhost:9092", "testdata0", new CSVMessageFormat(schemaIn0))) + .registerTable("CATEGORY", new KafkaEndpoint("localhost:9092", "testdata1", new CSVMessageFormat(schemaIn1))) + .registerTable("SALES", new KafkaEndpoint("localhost:9092", "testresult", new CSVMessageFormat(schemaOut))) + .registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str") + .executeSQL(dag, sql); + + dag.validate(); + } + + @Test + public void testJoinFilter() throws IOException, ClassNotFoundException + { + LogicalPlan dag = new LogicalPlan(); + String schemaIn0 = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + + "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss Z\"}}," + + "{\"name\":\"id\",\"type\":\"Integer\"}," + + "{\"name\":\"Product\",\"type\":\"String\"}," + + "{\"name\":\"units\",\"type\":\"Integer\"}]}"; + String schemaIn1 = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + + "{\"name\":\"id\",\"type\":\"Integer\"}," + + "{\"name\":\"Category\",\"type\":\"String\"}]}"; + String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + + "{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss Z\"}}," + + "{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss Z\"}}," + + "{\"name\":\"Product\",\"type\":\"String\"}," + + "{\"name\":\"Category\",\"type\":\"String\"}]}"; + + String sql = "INSERT INTO SALES SELECT STREAM A.ROWTIME, FLOOR(A.ROWTIME TO DAY), " + + "APEXCONCAT('OILPAINT', SUBSTRING(A.PRODUCT, 6, 7)), B.CATEGORY " + + "FROM ORDERS AS A JOIN CATEGORY AS B ON A.id = B.id AND A.id > 3" + + "WHERE A.PRODUCT LIKE 'paint%'"; + + SQLExecEnvironment.getEnvironment() + .registerTable("ORDERS", new KafkaEndpoint("localhost:9092", "testdata0", new CSVMessageFormat(schemaIn0))) + .registerTable("CATEGORY", new KafkaEndpoint("localhost:9092", "testdata1", new CSVMessageFormat(schemaIn1))) + .registerTable("SALES", new KafkaEndpoint("localhost:9092", "testresult", new CSVMessageFormat(schemaOut))) + .registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str") + .executeSQL(dag, sql); + + dag.validate(); + } + + @Test + public void testPortEndpoint() throws IOException, ClassNotFoundException + { + LogicalPlan dag = new LogicalPlan(); + + String schemaIn = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + + "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss Z\"}}," + + "{\"name\":\"id\",\"type\":\"Integer\"}," + + "{\"name\":\"Product\",\"type\":\"String\"}," + + "{\"name\":\"units\",\"type\":\"Integer\"}]}"; + String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + + "{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss Z\"}}," + + "{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss Z\"}}," + + "{\"name\":\"Product\",\"type\":\"String\"}]}"; + + KafkaSinglePortInputOperator kafkaInput = dag.addOperator("KafkaInput", KafkaSinglePortInputOperator.class); + kafkaInput.setTopics("testdata0"); + kafkaInput.setInitialOffset("EARLIEST"); + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaEndpoint.KEY_DESERIALIZER); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaEndpoint.VALUE_DESERIALIZER); + kafkaInput.setConsumerProps(props); + kafkaInput.setClusters("localhost:9092"); + + CsvParser csvParser = dag.addOperator("CSVParser", CsvParser.class); + csvParser.setSchema(schemaIn); + + dag.addStream("KafkaToCSV", kafkaInput.outputPort, csvParser.in); + + CsvFormatter formatter = dag.addOperator("CSVFormatter", CsvFormatter.class); + formatter.setSchema(schemaOut); + + KafkaSinglePortOutputOperator kafkaOutput = dag.addOperator("KafkaOutput", KafkaSinglePortOutputOperator.class); + kafkaOutput.setTopic("testresult"); + + props = new Properties(); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaEndpoint.VALUE_SERIALIZER); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaEndpoint.KEY_SERIALIZER); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + kafkaOutput.setProperties(props); + + dag.addStream("CSVToKafka", formatter.out, kafkaOutput.inputPort); + + SQLExecEnvironment.getEnvironment() + .registerTable("ORDERS", new StreamEndpoint(csvParser.out, InputPOJO.class)) + .registerTable("SALES", new StreamEndpoint(formatter.in, OutputPOJO.class)) + .registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str") + .executeSQL(dag, "INSERT INTO SALES " + "SELECT STREAM ROWTIME, " + "FLOOR(ROWTIME TO DAY), " + + "APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) " + "FROM ORDERS WHERE ID > 3 " + "AND " + + "PRODUCT LIKE 'paint%'"); + + dag.validate(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/java/org/apache/apex/malhar/sql/StreamEndpointTest.java ---------------------------------------------------------------------- diff --git a/sql/src/test/java/org/apache/apex/malhar/sql/StreamEndpointTest.java b/sql/src/test/java/org/apache/apex/malhar/sql/StreamEndpointTest.java new file mode 100644 index 0000000..1e9a1f8 --- /dev/null +++ b/sql/src/test/java/org/apache/apex/malhar/sql/StreamEndpointTest.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Properties; +import java.util.TimeZone; + +import javax.validation.ConstraintViolationException; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.apex.malhar.kafka.EmbeddedKafka; +import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator; +import org.apache.apex.malhar.kafka.KafkaSinglePortOutputOperator; +import org.apache.apex.malhar.sql.table.KafkaEndpoint; +import org.apache.apex.malhar.sql.table.StreamEndpoint; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; + +import com.google.common.collect.ImmutableMap; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.contrib.formatter.CsvFormatter; +import com.datatorrent.contrib.parser.CsvParser; + +public class StreamEndpointTest +{ + private String testTopicData0 = "dataTopic0"; + private String testTopicResult = "resultTopic"; + + private EmbeddedKafka kafka; + + private TimeZone defaultTZ; + + @Before + public void setup() throws IOException + { + defaultTZ = TimeZone.getDefault(); + TimeZone.setDefault(TimeZone.getTimeZone("GMT")); + + kafka = new EmbeddedKafka(); + kafka.start(); + kafka.createTopic(testTopicData0); + kafka.createTopic(testTopicResult); + } + + @After + public void tearDown() throws IOException + { + kafka.stop(); + + TimeZone.setDefault(defaultTZ); + } + + @Test + public void testApplicationWithPortEndpoint() + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new KafkaPortApplication(kafka.getBroker(), testTopicData0, testTopicResult), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + kafka.publish(testTopicData0, Arrays.asList("15/02/2016 10:15:00 +0000,1,paint1,11", + "15/02/2016 10:16:00 +0000,2,paint2,12", + "15/02/2016 10:17:00 +0000,3,paint3,13", "15/02/2016 10:18:00 +0000,4,paint4,14", + "15/02/2016 10:19:00 +0000,5,paint5,15", "15/02/2016 10:10:00 +0000,6,abcde6,16")); + + // TODO: Workaround to add \r\n char to test results because of bug in CsvFormatter which adds new line char. + String[] expectedLines = new String[] {"15/02/2016 10:18:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT4\r\n", + "15/02/2016 10:19:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT5\r\n"}; + + List<String> consume = kafka.consume(testTopicResult, 30000); + + Assert.assertTrue(Arrays.deepEquals(consume.toArray(new String[consume.size()]), expectedLines)); + + lc.shutdown(); + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } catch (Exception e) { + Assert.fail("Exception: " + e); + } + } + + public static class KafkaPortApplication implements StreamingApplication + { + private String broker; + private String sourceTopic; + private String destTopic; + + public KafkaPortApplication(String broker, String sourceTopic, String destTopic) + { + this.broker = broker; + this.sourceTopic = sourceTopic; + this.destTopic = destTopic; + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + String schemaIn = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + + "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," + + "{\"name\":\"id\",\"type\":\"Integer\"}," + + "{\"name\":\"Product\",\"type\":\"String\"}," + + "{\"name\":\"units\",\"type\":\"Integer\"}]}"; + String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + + "{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," + + "{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," + + "{\"name\":\"Product\",\"type\":\"String\"}]}"; + + KafkaSinglePortInputOperator kafkaInput = dag.addOperator("KafkaInput", KafkaSinglePortInputOperator.class); + kafkaInput.setTopics(sourceTopic); + kafkaInput.setInitialOffset("EARLIEST"); + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaEndpoint.KEY_DESERIALIZER); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaEndpoint.VALUE_DESERIALIZER); + kafkaInput.setConsumerProps(props); + kafkaInput.setClusters(broker); + + CsvParser csvParser = dag.addOperator("CSVParser", CsvParser.class); + csvParser.setSchema(schemaIn); + + dag.addStream("KafkaToCSV", kafkaInput.outputPort, csvParser.in); + + CsvFormatter formatter = dag.addOperator("CSVFormatter", CsvFormatter.class); + formatter.setSchema(schemaOut); + + KafkaSinglePortOutputOperator kafkaOutput = dag.addOperator("KafkaOutput", KafkaSinglePortOutputOperator.class); + kafkaOutput.setTopic(destTopic); + + props = new Properties(); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaEndpoint.VALUE_SERIALIZER); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaEndpoint.KEY_SERIALIZER); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); + kafkaOutput.setProperties(props); + + dag.addStream("CSVToKafka", formatter.out, kafkaOutput.inputPort); + + SQLExecEnvironment.getEnvironment() + .registerTable("ORDERS", new StreamEndpoint(csvParser.out, InputPOJO.class)) + .registerTable("SALES", new StreamEndpoint(formatter.in, + ImmutableMap.<String, Class>of("RowTime1", Date.class, "RowTime2", Date.class, "Product", String.class))) + .registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str") + .executeSQL(dag, "INSERT INTO SALES " + "SELECT STREAM ROWTIME, " + "FLOOR(ROWTIME TO DAY), " + + "APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) " + "FROM ORDERS WHERE ID > 3 " + "AND " + + "PRODUCT LIKE 'paint%'"); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/java/org/apache/apex/malhar/sql/codegen/BeanClassGeneratorTest.java ---------------------------------------------------------------------- diff --git a/sql/src/test/java/org/apache/apex/malhar/sql/codegen/BeanClassGeneratorTest.java b/sql/src/test/java/org/apache/apex/malhar/sql/codegen/BeanClassGeneratorTest.java index 8fcb7f8..b5cd378 100644 --- a/sql/src/test/java/org/apache/apex/malhar/sql/codegen/BeanClassGeneratorTest.java +++ b/sql/src/test/java/org/apache/apex/malhar/sql/codegen/BeanClassGeneratorTest.java @@ -1,6 +1,20 @@ /** - * Copyright (c) 2015 DataTorrent, Inc. - * All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.apex.malhar.sql.codegen; @@ -17,6 +31,7 @@ import org.junit.Test; import org.junit.rules.TestWatcher; import org.junit.runner.Description; +import org.apache.apex.malhar.lib.utils.ClassLoaderUtils; import org.apache.apex.malhar.sql.schema.TupleSchemaRegistry; import static org.junit.Assert.assertEquals; @@ -60,7 +75,7 @@ public class BeanClassGeneratorTest byte[] beanClass = BeanClassGenerator.createAndWriteBeanClass(addressClassName, schema.fieldList); - Class<?> clazz = BeanClassGenerator.readBeanClass(addressClassName, beanClass); + Class<?> clazz = ClassLoaderUtils.readBeanClass(addressClassName, beanClass); Object o = clazz.newInstance(); Field f = clazz.getDeclaredField("streetNumber"); @@ -91,7 +106,7 @@ public class BeanClassGeneratorTest byte[] beanClass = BeanClassGenerator.createAndWriteBeanClass(addressClassName, schema.fieldList); - Class<?> clazz = BeanClassGenerator.readBeanClass(addressClassName, beanClass); + Class<?> clazz = ClassLoaderUtils.readBeanClass(addressClassName, beanClass); Object o = clazz.newInstance(); Field f = clazz.getDeclaredField("streetNumber"); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/resources/input.csv ---------------------------------------------------------------------- diff --git a/sql/src/test/resources/input.csv b/sql/src/test/resources/input.csv new file mode 100644 index 0000000..c4786d1 --- /dev/null +++ b/sql/src/test/resources/input.csv @@ -0,0 +1,6 @@ +15/02/2016 10:15:00 +0000,1,paint1,11 +15/02/2016 10:16:00 +0000,2,paint2,12 +15/02/2016 10:17:00 +0000,3,paint3,13 +15/02/2016 10:18:00 +0000,4,paint4,14 +15/02/2016 10:19:00 +0000,5,paint5,15 +15/02/2016 10:10:00 +0000,6,abcde6,16 http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/sql/src/test/resources/log4j.properties b/sql/src/test/resources/log4j.properties new file mode 100644 index 0000000..8ea3cfe --- /dev/null +++ b/sql/src/test/resources/log4j.properties @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.CONSOLE.threshold=WARN +test.log.console.threshold=WARN + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=INFO +log4j.logger.org.apache.apex=INFO + +log4j.logger.org.apache.calcite=WARN +log4j.logger.org.apache.kafka=WARN +log4j.logger.org.I0Itec.zkclient.ZkClient=WARN +log4j.logger.org.apache.zookeeper=WARN +log4j.logger.kafka=WARN +log4j.logger.kafka.consumer=WARN http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/resources/model/model_file_csv.json ---------------------------------------------------------------------- diff --git a/sql/src/test/resources/model/model_file_csv.json b/sql/src/test/resources/model/model_file_csv.json new file mode 100644 index 0000000..beba18d --- /dev/null +++ b/sql/src/test/resources/model/model_file_csv.json @@ -0,0 +1,27 @@ +{ + "version": "1.0", + "defaultSchema": "APEX", + "schemas": [{ + "name": "APEX", + "tables": [ + { + "name": "ORDERS", + "type": "custom", + "factory": "org.apache.apex.malhar.sql.schema.ApexSQLTableFactory", + "stream": { + "stream": true + }, + "operand": { + "endpoint": "file", + "messageFormat": "csv", + "endpointOperands": { + "directory": "src/test/resources/input.csv" + }, + "messageFormatOperands": { + "schema": "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss\"}},{\"name\":\"id\",\"type\":\"Integer\"},{\"name\":\"Product\",\"type\":\"String\"},{\"name\":\"units\",\"type\":\"Integer\"}]}" + } + } + } + ] + }] +}
