Repository: apex-malhar Updated Branches: refs/heads/master a5e8fa3fa -> ee77dc654
APEXMALHAR-2344 code changes to initialize the list of FieldInfo from properties.xml Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/ee77dc65 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/ee77dc65 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/ee77dc65 Branch: refs/heads/master Commit: ee77dc654692c6a780c9f54a98f838c5f14e6527 Parents: a5e8fa3 Author: Hitesh-Scorpio <[email protected]> Authored: Mon Nov 28 19:48:31 2016 +0530 Committer: Hitesh-Scorpio <[email protected]> Committed: Thu Dec 1 12:21:58 2016 +0530 ---------------------------------------------------------------------- .../lib/db/jdbc/JdbcPOJOInputOperator.java | 36 +++- .../lib/db/jdbc/JdbcPOJOPollInputOperator.java | 37 +++- .../jdbc/JdbcInputOperatorApplicationTest.java | 167 +++++++++++++++++++ library/src/test/resources/JdbcProperties.xml | 131 +++++++++++++++ 4 files changed, 369 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ee77dc65/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java index 59c2807..e587d85 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java @@ -27,12 +27,14 @@ import java.sql.SQLException; import java.sql.Time; import java.sql.Timestamp; import java.sql.Types; +import java.util.ArrayList; import java.util.List; import java.util.Map; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; +import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,7 +82,7 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> private boolean mysqlSyntax; @NotNull - private List<FieldInfo> fieldInfos; + private List<FieldInfo> fieldInfos = new ArrayList<>();; @Min(1) private int fetchSize; @@ -632,5 +634,37 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> this.mysqlSyntax = mysqlSyntax; } + /** + * Function to initialize the list of {@link FieldInfo} externally from configuration/properties file. + * Example entry in the properties/configuration file: + <property> + <name>dt.operator.JdbcPOJOInput.fieldInfosItem[0]</name> + <value> + { + "columnName":"ID", + "pojoFieldExpression": "id", + "type":"INTEGER" + } + </value> + </property> + * @param index is the index in the list which is to be initialized. + * @param value is the JSON String with appropriate mappings for {@link FieldInfo}. + */ + public void setFieldInfosItem(int index, String value) + { + try { + JSONObject jo = new JSONObject(value); + FieldInfo fieldInfo = new FieldInfo(jo.getString("columnName"), jo.getString("pojoFieldExpression"), + FieldInfo.SupportType.valueOf(jo.getString("type"))); + final int need = index - fieldInfos.size() + 1; + for (int i = 0; i < need; i++) { + fieldInfos.add(null); + } + fieldInfos.set(index,fieldInfo); + } catch (Exception e) { + throw new RuntimeException("Exception in setting FieldInfo " + value + " " + e.getMessage()); + } + } + public static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOInputOperator.class); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ee77dc65/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java index 62618de..129981a 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java @@ -27,11 +27,13 @@ import java.sql.SQLException; import java.sql.Time; import java.sql.Timestamp; import java.sql.Types; +import java.util.ArrayList; import java.util.List; import java.util.Map; import javax.validation.constraints.NotNull; +import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,7 +67,7 @@ public class JdbcPOJOPollInputOperator extends AbstractJdbcPollInputOperator<Obj protected List<Integer> columnDataTypes; protected transient Class<?> pojoClass; @NotNull - private List<FieldInfo> fieldInfos; + private List<FieldInfo> fieldInfos = new ArrayList<>(); @OutputPortFieldAnnotation(schemaRequired = true) public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>() @@ -323,5 +325,38 @@ public class JdbcPOJOPollInputOperator extends AbstractJdbcPollInputOperator<Obj this.fieldInfos = fieldInfos; } + /** + * Function to initialize the list of {@link FieldInfo} externally from configuration/properties file. + * Example entry in the properties/configuration file: + <property> + <name>dt.operator.JdbcPOJOInput.fieldInfosItem[0]</name> + <value> + { + "columnName":"ID", + "pojoFieldExpression": "id", + "type":"INTEGER" + } + </value> + </property> + * @param index is the index in the list which is to be initialized. + * @param value is the JSON String with appropriate mappings for {@link FieldInfo}. + */ + public void setFieldInfosItem(int index, String value) + { + + try { + JSONObject jo = new JSONObject(value); + FieldInfo fieldInfo = new FieldInfo(jo.getString("columnName"), jo.getString("pojoFieldExpression"), + FieldInfo.SupportType.valueOf(jo.getString("type"))); + final int need = index - fieldInfos.size() + 1; + for (int i = 0; i < need; i++) { + fieldInfos.add(null); + } + fieldInfos.set(index,fieldInfo); + } catch (Exception e) { + throw new RuntimeException("Exception in setting FieldInfo " + value + " " + e.getMessage()); + } + } + private static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOPollInputOperator.class); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ee77dc65/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcInputOperatorApplicationTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcInputOperatorApplicationTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcInputOperatorApplicationTest.java new file mode 100644 index 0000000..c59978a --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcInputOperatorApplicationTest.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.Callable; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.stram.StramLocalCluster; + +/** + * Tests to check if setting List of {@link FieldInfo} externally from + * configuration file works fine for Jdbc input operators. + */ +public class JdbcInputOperatorApplicationTest extends JdbcOperatorTest +{ + public static int TupleCount; + + public int getNumOfRowsinTable(String tableName) + { + Connection con; + try { + con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + String countQuery = "SELECT count(*) from " + tableName; + ResultSet resultSet = stmt.executeQuery(countQuery); + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException e) { + throw new RuntimeException("fetching count", e); + } + } + + public void testApplication(StreamingApplication streamingApplication) throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/JdbcProperties.xml")); + lma.prepareDAG(streamingApplication, conf); + LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(false); + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return TupleCount == 10; + } + }); + lc.run(10000);// runs for 10 seconds and quits + Assert.assertEquals("rows in db", TupleCount, getNumOfRowsinTable(TABLE_POJO_NAME)); + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + + } + + @Test + public void testJdbcPOJOPollInputOperatorApplication() throws Exception + { + testApplication(new JdbcPOJOPollInputOperatorApplication()); + } + + @Test + public void testJdbcPOJOInputOperatorApplication() throws Exception + { + testApplication(new JdbcPOJOInputOperatorApplication()); + } + + + public static class JdbcPOJOInputOperatorApplication implements StreamingApplication + { + @Override + public void populateDAG(DAG dag, Configuration configuration) + { + cleanTable(); + insertEvents(10, true, 0); + JdbcPOJOInputOperator inputOperator = dag.addOperator("JdbcPOJOInput", new JdbcPOJOInputOperator()); + JdbcStore store = new JdbcStore(); + store.setDatabaseDriver(DB_DRIVER); + store.setDatabaseUrl(URL); + inputOperator.setStore(store); + inputOperator.setTableName(TABLE_POJO_NAME); + inputOperator.setFetchSize(100); + dag.getMeta(inputOperator).getMeta(inputOperator.outputPort).getAttributes().put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); + ResultCollector result = dag.addOperator("result", new ResultCollector()); + dag.addStream("pojo", inputOperator.outputPort, result.input); + } + } + + public static class JdbcPOJOPollInputOperatorApplication implements StreamingApplication + { + @Override + public void populateDAG(DAG dag, Configuration configuration) + { + cleanTable(); + insertEvents(10, true, 0); + JdbcPOJOPollInputOperator inputOperator = dag.addOperator("JdbcPOJOPollInput", new JdbcPOJOPollInputOperator()); + JdbcStore store = new JdbcStore(); + store.setDatabaseDriver(DB_DRIVER); + store.setDatabaseUrl(URL); + inputOperator.setStore(store); + inputOperator.setTableName(TABLE_POJO_NAME); + inputOperator.setKey("id"); + inputOperator.setFetchSize(100); + inputOperator.setBatchSize(100); + inputOperator.setPartitionCount(2); + dag.getMeta(inputOperator).getMeta(inputOperator.outputPort).getAttributes().put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); + ResultCollector result = dag.addOperator("result", new ResultCollector()); + dag.addStream("pojo", inputOperator.outputPort, result.input); + } + } + + public static class ResultCollector extends BaseOperator + { + public final transient DefaultInputPort<java.lang.Object> input = new DefaultInputPort<Object>() + { + @Override + public void process(java.lang.Object in) + { + TestPOJOEvent obj = (TestPOJOEvent)in; + TupleCount++; + } + }; + + @Override + public void setup(Context.OperatorContext context) + { + TupleCount = 0; + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ee77dc65/library/src/test/resources/JdbcProperties.xml ---------------------------------------------------------------------- diff --git a/library/src/test/resources/JdbcProperties.xml b/library/src/test/resources/JdbcProperties.xml index 3e76cf7..431c113 100644 --- a/library/src/test/resources/JdbcProperties.xml +++ b/library/src/test/resources/JdbcProperties.xml @@ -48,6 +48,137 @@ </value> </property> + <property> + <name>dt.operator.JdbcPOJOPollInput.fieldInfosItem[0]</name> + <value> + { + "columnName":"ID", + "pojoFieldExpression": "id", + "type":"INTEGER" + } + + </value> + </property> + + + <property> + <name>dt.operator.JdbcPOJOPollInput.fieldInfosItem[1]</name> + <value> + { + "columnName":"STARTDATE", + "pojoFieldExpression": "startDate", + "type":"STRING" + } + + </value> + </property> + <property> + <name>dt.operator.JdbcPOJOPollInput.fieldInfosItem[2]</name> + <value> + { + "columnName":"STARTTIME", + "pojoFieldExpression": "startTime", + "type":"STRING" + } + + </value> + </property> + + <property> + <name>dt.operator.JdbcPOJOPollInput.fieldInfosItem[3]</name> + <value> + { + "columnName":"STARTTIMESTAMP", + "pojoFieldExpression": "startTimestamp", + "type":"STRING" + } + + </value> + </property> + + <property> + <name>dt.operator.JdbcPOJOPollInput.fieldInfosItem[4]</name> + <value> + { + "columnName":"SCORE", + "pojoFieldExpression": "score", + "type":"DOUBLE" + } + + </value> + </property> + + <property> + <name>dt.operator.JdbcPOJOPollInput.columnsExpression</name> + <value> + id,startDate,startTime,startTimestamp,score + </value> + </property> + + + + <property> + <name>dt.operator.JdbcPOJOInput.fieldInfosItem[0]</name> + <value> + { + "columnName":"ID", + "pojoFieldExpression": "id", + "type":"INTEGER" + } + + </value> + </property> + + + <property> + <name>dt.operator.JdbcPOJOInput.fieldInfosItem[1]</name> + <value> + { + "columnName":"STARTDATE", + "pojoFieldExpression": "startDate", + "type":"STRING" + } + + </value> + </property> + <property> + <name>dt.operator.JdbcPOJOInput.fieldInfosItem[2]</name> + <value> + { + "columnName":"STARTTIME", + "pojoFieldExpression": "startTime", + "type":"STRING" + } + + </value> + </property> + + <property> + <name>dt.operator.JdbcPOJOInput.fieldInfosItem[3]</name> + <value> + { + "columnName":"STARTTIMESTAMP", + "pojoFieldExpression": "startTimestamp", + "type":"STRING" + } + + </value> + </property> + + <property> + <name>dt.operator.JdbcPOJOInput.fieldInfosItem[4]</name> + <value> + { + "columnName":"SCORE", + "pojoFieldExpression": "score", + "type":"DOUBLE" + } + + </value> + </property> + + + </configuration>
