Repository: apex-malhar Updated Branches: refs/heads/master f617d5e35 -> e01cf9c44
APEXMALHAR-2340 code changes to initialize the list of JdbcFieldInfo in JdbcPOJOInsertOutput from properties.xml Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/e01cf9c4 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/e01cf9c4 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/e01cf9c4 Branch: refs/heads/master Commit: e01cf9c447a34791e6de5efb160f91ac20a87df9 Parents: f617d5e Author: Hitesh-Scorpio <[email protected]> Authored: Thu Nov 17 10:49:20 2016 +0530 Committer: Hitesh-Scorpio <[email protected]> Committed: Thu Nov 24 16:24:53 2016 +0530 ---------------------------------------------------------------------- .../db/jdbc/AbstractJdbcPOJOOutputOperator.java | 27 +++- .../db/jdbc/JdbcPOJOInsertOutputOperator.java | 4 +- .../jdbc/JdbcPojoOperatorApplicationTest.java | 129 +++++++++++++++++++ library/src/test/resources/JdbcProperties.xml | 53 ++++++++ 4 files changed, 209 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e01cf9c4/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java index 38d44a0..99b14da 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java @@ -25,10 +25,12 @@ import java.sql.SQLException; import java.sql.Time; import java.sql.Timestamp; import java.sql.Types; +import java.util.ArrayList; import java.util.List; import javax.validation.constraints.NotNull; +import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,8 +63,7 @@ import com.datatorrent.lib.util.PojoUtils.GetterShort; @org.apache.hadoop.classification.InterfaceStability.Evolving public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object> { - private List<JdbcFieldInfo> fieldInfos; - + private List<JdbcFieldInfo> fieldInfos = new ArrayList<>(); protected List<Integer> columnDataTypes; @NotNull @@ -295,4 +296,26 @@ public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransac } } + /** + * Function to initialize the list of {@link JdbcFieldInfo} from properties.xml + * @param index + * @param value + */ + public void setFieldInfosItem(int index, String value) + { + try { + JSONObject jo = new JSONObject(value); + JdbcFieldInfo jdbcFieldInfo = new JdbcFieldInfo(jo.getString("columnName"), jo.getString("pojoFieldExpression"), + FieldInfo.SupportType.valueOf(jo.getString("type")), jo.getInt("sqlType")); + final int need = index - fieldInfos.size() + 1; + for (int i = 0; i < need; i++) { + fieldInfos.add(null); + } + fieldInfos.set(index,jdbcFieldInfo); + } catch (Exception e) { + throw new RuntimeException("Exception in setting JdbcFieldInfo"); + } + } + + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e01cf9c4/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java index 706757a..8fe20fe 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java @@ -67,7 +67,7 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator * columnNamesSet is the set having column names given by the user */ HashSet<String> columnNamesSet = new HashSet<>(); - if (getFieldInfos() == null) { // then assume direct mapping + if (getFieldInfos() == null || getFieldInfos().size() == 0) { // then assume direct mapping LOG.info("FieldInfo missing. Assuming direct mapping between POJO fields and DB columns"); } else { // FieldInfo supplied by user @@ -93,7 +93,7 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator @Override public void activate(OperatorContext context) { - if (getFieldInfos() == null) { + if (getFieldInfos() == null || getFieldInfos().size() == 0) { Field[] fields = pojoClass.getDeclaredFields(); // Create fieldInfos in case of direct mapping List<JdbcFieldInfo> fieldInfos = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e01cf9c4/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorApplicationTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorApplicationTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorApplicationTest.java new file mode 100644 index 0000000..fe43327 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorApplicationTest.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.Callable; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.stram.StramLocalCluster; + +public class JdbcPojoOperatorApplicationTest extends JdbcOperatorTest +{ + public static int TupleCount; + public static com.datatorrent.lib.parser.XmlParserTest.EmployeeBean obj; + + public int getNumOfRowsinTable(String tableName) + { + Connection con; + try { + con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + String countQuery = "SELECT count(*) from " + tableName; + ResultSet resultSet = stmt.executeQuery(countQuery); + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException e) { + throw new RuntimeException("fetching count", e); + } + } + + @Test + public void testApplication() throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/JdbcProperties.xml")); + lma.prepareDAG(new JdbcPojoOperatorApplication(), conf); + LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(false); + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return TupleCount == 10; + } + }); + lc.run(10000);// runs for 10 seconds and quits + Assert.assertEquals("rows in db", 10, getNumOfRowsinTable(TABLE_POJO_NAME)); + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + + public static class JdbcPojoOperatorApplication implements StreamingApplication + { + @Override + public void populateDAG(DAG dag, Configuration configuration) + { + JdbcPOJOInsertOutputOperator jdbc = dag.addOperator("JdbcOutput", new JdbcPOJOInsertOutputOperator()); + JdbcTransactionalStore outputStore = new JdbcTransactionalStore(); + outputStore.setDatabaseDriver(DB_DRIVER); + outputStore.setDatabaseUrl(URL); + jdbc.setStore(outputStore); + jdbc.setBatchSize(3); + jdbc.setTablename(TABLE_POJO_NAME); + dag.getMeta(jdbc).getMeta(jdbc.input).getAttributes().put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); + cleanTable(); + JdbcPojoEmitterOperator input = dag.addOperator("data", new JdbcPojoEmitterOperator()); + dag.addStream("pojo", input.output, jdbc.input); + } + } + + public static class JdbcPojoEmitterOperator extends BaseOperator implements InputOperator + { + public static int emitTuple = 10; + public final transient DefaultOutputPort<TestPOJOEvent> output = new DefaultOutputPort<TestPOJOEvent>(); + + @Override + public void emitTuples() + { + if (emitTuple > 0) { + output.emit(new TestPOJOEvent(emitTuple,"test" + emitTuple)); + emitTuple--; + TupleCount++; + } + } + + @Override + public void setup(Context.OperatorContext context) + { + TupleCount = 0; + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e01cf9c4/library/src/test/resources/JdbcProperties.xml ---------------------------------------------------------------------- diff --git a/library/src/test/resources/JdbcProperties.xml b/library/src/test/resources/JdbcProperties.xml new file mode 100644 index 0000000..3e76cf7 --- /dev/null +++ b/library/src/test/resources/JdbcProperties.xml @@ -0,0 +1,53 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + + <property> + <name>dt.operator.JdbcOutput.fieldInfosItem[0]</name> + <value> + { + "sqlType": 0, + "columnName":"ID", + "pojoFieldExpression": "id", + "type":"INTEGER" + } + + </value> + </property> + + + <property> + <name>dt.operator.JdbcOutput.fieldInfosItem[1]</name> + <value> + { + "sqlType": 0, + "columnName":"NAME", + "pojoFieldExpression": "name", + "type":"STRING" + } + + </value> + </property> + + + +</configuration>
