APEXMALHAR-1953: Added JdbcPOJONonInsertOutputOperator for update / delete / merge queries. Added error port and error tuple handling to AbstractJdbcTransactionableOutputOperator, Added Autometrics, Added Unit tests and Refactored accordingly
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/ddd5bcf1 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/ddd5bcf1 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/ddd5bcf1 Branch: refs/heads/master Commit: ddd5bcf1a80d22d0e2727868339757dcce3bb2b8 Parents: f54ba32 Author: bhupesh <[email protected]> Authored: Thu Mar 17 18:24:08 2016 +0530 Committer: bhupesh <[email protected]> Committed: Fri Jul 1 11:53:43 2016 +0530 ---------------------------------------------------------------------- .../db/jdbc/AbstractJdbcPOJOOutputOperator.java | 29 ++-- ...stractJdbcTransactionableOutputOperator.java | 77 ++++++++- .../datatorrent/lib/db/jdbc/JdbcFieldInfo.java | 58 +++++++ .../db/jdbc/JdbcPOJOInsertOutputOperator.java | 19 ++- .../jdbc/JdbcPOJONonInsertOutputOperator.java | 76 +++++++++ .../com/datatorrent/lib/db/jdbc/JdbcIOApp.java | 15 +- .../lib/db/jdbc/JdbcOperatorTest.java | 171 ++++++++++++++++++- 7 files changed, 411 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ddd5bcf1/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java index c310a40..45e0cbb 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java @@ -38,7 +38,6 @@ import com.datatorrent.api.Context; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.annotation.InputPortFieldAnnotation; -import com.datatorrent.lib.util.FieldInfo; import com.datatorrent.lib.util.PojoUtils; import com.datatorrent.lib.util.PojoUtils.Getter; import com.datatorrent.lib.util.PojoUtils.GetterBoolean; @@ -61,7 +60,7 @@ import com.datatorrent.lib.util.PojoUtils.GetterShort; @org.apache.hadoop.classification.InterfaceStability.Evolving public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object> { - private List<FieldInfo> fieldInfos; + private List<JdbcFieldInfo> fieldInfos; protected List<Integer> columnDataTypes; @@ -142,15 +141,15 @@ public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransac break; case Types.TIMESTAMP: - statement.setTimestamp(i + 1, new Timestamp(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple))); + statement.setTimestamp(i + 1, ((Getter<Object, Timestamp>)activeFieldInfo.setterOrGetter).get(tuple)); break; case Types.TIME: - statement.setTime(i + 1, new Time(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple))); + statement.setTime(i + 1, ((Getter<Object, Time>)activeFieldInfo.setterOrGetter).get(tuple)); break; case Types.DATE: - statement.setDate(i + 1, new Date(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple))); + statement.setDate(i + 1, ((Getter<Object, Date>)activeFieldInfo.setterOrGetter).get(tuple)); break; default: @@ -169,7 +168,7 @@ public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransac /** * A list of {@link FieldInfo}s where each item maps a column name to a pojo field name. */ - public List<FieldInfo> getFieldInfos() + public List<JdbcFieldInfo> getFieldInfos() { return fieldInfos; } @@ -182,7 +181,7 @@ public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransac * @description $[].pojoFieldExpression pojo field name or expression * @useSchema $[].pojoFieldExpression input.fields[].name */ - public void setFieldInfos(List<FieldInfo> fieldInfos) + public void setFieldInfos(List<JdbcFieldInfo> fieldInfos) { this.fieldInfos = fieldInfos; } @@ -195,6 +194,10 @@ public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransac return tablename; } + /** + * Set the target table name in database + * @param tablename + */ public void setTablename(String tablename) { this.tablename = tablename; @@ -259,18 +262,18 @@ public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransac break; case Types.TIMESTAMP: - activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, - activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression(), Timestamp.class); break; case Types.TIME: - activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, - activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression(), Time.class); break; case Types.DATE: - activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, - activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression(), Date.class); break; default: http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ddd5bcf1/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java index d3300fc..0a7f3fd 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java @@ -18,8 +18,10 @@ */ package com.datatorrent.lib.db.jdbc; +import java.sql.BatchUpdateException; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.sql.Statement; import java.util.List; import javax.validation.constraints.Min; @@ -28,10 +30,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; - +import com.datatorrent.api.AutoMetric; import com.datatorrent.api.Context; import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator; /** @@ -69,6 +73,14 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T> private transient int batchStartIdx; private transient PreparedStatement updateCommand; + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<T> error = new DefaultOutputPort<>(); + + @AutoMetric + private int tuplesWrittenSuccessfully; + @AutoMetric + private int errorTuples; + public AbstractJdbcTransactionableOutputOperator() { tuples = Lists.newArrayList(); @@ -95,6 +107,14 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T> } @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + tuplesWrittenSuccessfully = 0; + errorTuples = 0; + } + + @Override public void endWindow() { if (tuples.size() - batchStartIdx > 0) { @@ -129,10 +149,47 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T> } updateCommand.executeBatch(); updateCommand.clearBatch(); + batchStartIdx += tuples.size() - batchStartIdx; + } catch (BatchUpdateException bue) { + logger.error(bue.getMessage()); + processUpdateCounts(bue.getUpdateCounts(), tuples.size() - batchStartIdx); } catch (SQLException e) { throw new RuntimeException("processing batch", e); - } finally { - batchStartIdx += tuples.size() - batchStartIdx; + } + } + + /** + * Identify which commands in the batch failed and redirect these on the error port. + * See https://docs.oracle.com/javase/7/docs/api/java/sql/BatchUpdateException.html for more details + * + * @param updateCounts + * @param commandsInBatch + */ + private void processUpdateCounts(int[] updateCounts, int commandsInBatch) + { + if (updateCounts.length < commandsInBatch) { + // Driver chose not to continue processing after failure. + error.emit(tuples.get(updateCounts.length + batchStartIdx)); + errorTuples++; + // In this case, updateCounts is the number of successful queries + tuplesWrittenSuccessfully += updateCounts.length; + // Skip the error record + batchStartIdx += updateCounts.length + 1; + // And process the remaining if any + if ((tuples.size() - batchStartIdx) > 0) { + processBatch(); + } + } else { + // Driver processed all batch statements in spite of failures. + // Pick out the failures and send on error port. + tuplesWrittenSuccessfully = commandsInBatch; + for (int i = 0; i < commandsInBatch; i++) { + if (updateCounts[i] == Statement.EXECUTE_FAILED) { + error.emit(tuples.get(i + batchStartIdx)); + errorTuples++; + tuplesWrittenSuccessfully--; + } + } } } @@ -163,6 +220,20 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T> */ protected abstract void setStatementParameters(PreparedStatement statement, T tuple) throws SQLException; + public int getTuplesWrittenSuccessfully() + { + return tuplesWrittenSuccessfully; + } + + /** + * Setter for metric tuplesWrittenSuccessfully + * @param tuplesWrittenSuccessfully + */ + public void setTuplesWrittenSuccessfully(int tuplesWrittenSuccessfully) + { + this.tuplesWrittenSuccessfully = tuplesWrittenSuccessfully; + } + private static final Logger logger = LoggerFactory.getLogger(AbstractJdbcTransactionableOutputOperator.class); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ddd5bcf1/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcFieldInfo.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcFieldInfo.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcFieldInfo.java new file mode 100644 index 0000000..5c7e6e8 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcFieldInfo.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import com.datatorrent.lib.util.FieldInfo; + +/** + * A {@link FieldInfo} object for Jdbc. <br/> + * Includes an SQL type for each field. <br/> + * An {@link FieldInfo} object used for JDBC output sources must have the SQL data types. + * This is needed to create correct getters and setters for the POJO, + * as well as setting the right parameter types in the JDBC prepared statement. + */ +public class JdbcFieldInfo extends FieldInfo +{ + private int sqlType; + + public JdbcFieldInfo() + { + } + + public JdbcFieldInfo(String columnName, String pojoFieldExpression, SupportType type, int sqlType) + { + super(columnName, pojoFieldExpression, type); + + this.sqlType = sqlType; + } + + public int getSqlType() + { + return sqlType; + } + + /** + * Set the sql data type for this {@link JdbcFieldInfo} + * @param sqlType + */ + public void setSqlType(int sqlType) + { + this.sqlType = sqlType; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ddd5bcf1/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java index 09bab2f..67ec023 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java @@ -23,6 +23,7 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Types; import java.util.List; import org.slf4j.Logger; @@ -60,7 +61,7 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator // Populate columnNames and columnDataTypes try { if (getFieldInfos() == null) { // then assume direct mapping - LOG.info("Assuming direct mapping between POJO fields and DB columns"); + LOG.info("FieldInfo missing. Assuming direct mapping between POJO fields and DB columns"); populateColumnDataTypes(null); } else { // FieldInfo supplied by user @@ -84,20 +85,20 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator @Override public void activate(OperatorContext context) { - if(getFieldInfos() == null) { + if (getFieldInfos() == null) { Field[] fields = pojoClass.getDeclaredFields(); // Create fieldInfos in case of direct mapping - List<FieldInfo> fieldInfos = Lists.newArrayList(); + List<JdbcFieldInfo> fieldInfos = Lists.newArrayList(); for (int i = 0; i < columnNames.size(); i++) { String columnName = columnNames.get(i); String pojoField = getMatchingField(fields, columnName); - if(columnNullabilities.get(i) == ResultSetMetaData.columnNoNulls && - (pojoField == null || pojoField.length() == 0)) { - throw new RuntimeException("Data for a non-nullable field not found in POJO"); + if (columnNullabilities.get(i) == ResultSetMetaData.columnNoNulls && + (pojoField == null || pojoField.length() == 0)) { + throw new RuntimeException("Data for a non-nullable field: " + columnName + " not found in POJO"); } else { - if(pojoField != null && pojoField.length() != 0) { - FieldInfo fi = new FieldInfo(columnName, pojoField, null); + if (pojoField != null && pojoField.length() != 0) { + JdbcFieldInfo fi = new JdbcFieldInfo(columnName, pojoField, null, Types.NULL); fieldInfos.add(fi); } else { columnDataTypes.remove(i); @@ -138,7 +139,7 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator private String getMatchingField(Field[] fields, String columnName) { for (Field f: fields) { - if(f.getName().equalsIgnoreCase(columnName)) { + if (f.getName().equalsIgnoreCase(columnName)) { return f.getName(); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ddd5bcf1/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJONonInsertOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJONonInsertOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJONonInsertOutputOperator.java new file mode 100644 index 0000000..82ff043 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJONonInsertOutputOperator.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context.OperatorContext; + +/** + * <p> + * JdbcPOJOInsertOutputOperator class.</p> + * An implementation of AbstractJdbcTransactionableOutputOperator which takes in any POJO. + * + * @displayName Jdbc Output Operator + * @category Output + * @tags database, sql, pojo, jdbc + * @since 2.1.0 + */ [email protected] +public class JdbcPOJONonInsertOutputOperator extends AbstractJdbcPOJOOutputOperator +{ + @NotNull + String sqlStatement; + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + + columnDataTypes = Lists.newArrayList(); + for (JdbcFieldInfo fi : getFieldInfos()) { + columnFieldGetters.add(new JdbcPOJOInputOperator.ActiveFieldInfo(fi)); + columnDataTypes.add(fi.getSqlType()); + } + } + + @Override + protected String getUpdateCommand() + { + return sqlStatement; + } + + /** + * Sets the parameterized SQL query for the JDBC update operation. + * This can be an update, delete or a merge query. + * Example: "update testTable set id = ? where name = ?" + * @param sqlStatement the SQL query + */ + public void setSqlStatement(String sqlStatement) + { + this.sqlStatement = sqlStatement; + } + + private static final Logger LOG = LoggerFactory.getLogger(JdbcPOJONonInsertOutputOperator.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ddd5bcf1/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java index 4675cdb..dc695eb 100644 --- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java @@ -18,6 +18,7 @@ */ package com.datatorrent.lib.db.jdbc; +import java.sql.Types; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -49,12 +50,12 @@ public class JdbcIOApp implements StreamingApplication dag.getMeta(jdbcInputOperator).getMeta(jdbcInputOperator.outputPort).getAttributes() .put(Context.PortContext.TUPLE_CLASS, JdbcIOAppTest.PojoEvent.class); - JdbcPOJOOutputOperator jdbcOutputOperator = dag.addOperator("JdbcOutput", new JdbcPOJOOutputOperator()); + JdbcPOJOInsertOutputOperator jdbcOutputOperator = dag.addOperator("JdbcOutput", new JdbcPOJOInsertOutputOperator()); JdbcTransactionalStore outputStore = new JdbcTransactionalStore(); outputStore.setDatabaseDriver("org.hsqldb.jdbcDriver"); outputStore.setDatabaseUrl("jdbc:hsqldb:mem:test"); jdbcOutputOperator.setStore(outputStore); - jdbcOutputOperator.setFieldInfos(addFieldInfos()); + jdbcOutputOperator.setFieldInfos(addJdbcFieldInfos()); jdbcOutputOperator.setTablename("test_app_output_event_table"); jdbcOutputOperator.setBatchSize(10); dag.getMeta(jdbcOutputOperator).getMeta(jdbcOutputOperator.input).getAttributes() @@ -72,4 +73,14 @@ public class JdbcIOApp implements StreamingApplication fieldInfos.add(new FieldInfo("AMOUNT", "amount", SupportType.INTEGER)); return fieldInfos; } + + private List<JdbcFieldInfo> addJdbcFieldInfos() + { + List<JdbcFieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new JdbcFieldInfo("ACCOUNT_NO", "accountNumber", SupportType.INTEGER, Types.INTEGER)); + fieldInfos.add(new JdbcFieldInfo("NAME", "name", SupportType.STRING, Types.VARCHAR)); + fieldInfos.add(new JdbcFieldInfo("AMOUNT", "amount", SupportType.INTEGER, Types.INTEGER)); + return fieldInfos; + } + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ddd5bcf1/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java index 26196f5..e432ab3 100644 --- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java @@ -27,6 +27,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Time; import java.sql.Timestamp; +import java.sql.Types; import java.util.List; import javax.annotation.Nonnull; @@ -44,6 +45,7 @@ import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.helper.TestPortContext; import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.TestUtils; import com.datatorrent.netlet.util.DTThrowable; /** @@ -165,10 +167,10 @@ public class JdbcOperatorTest + "(id INTEGER not NULL,name VARCHAR(255),startDate DATE,startTime TIME,startTimestamp TIMESTAMP, PRIMARY KEY ( id ))"; stmt.executeUpdate(createPOJOTable); String createPOJOTableIdDiff = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME_ID_DIFF - + "(id1 INTEGER not NULL,name VARCHAR(255), PRIMARY KEY ( id1 ))"; + + "(id1 INTEGER not NULL,name VARCHAR(255), PRIMARY KEY ( id1 ))"; stmt.executeUpdate(createPOJOTableIdDiff); String createPOJOTableNameDiff = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME_NAME_DIFF - + "(id INTEGER not NULL,name1 VARCHAR(255), PRIMARY KEY ( id ))"; + + "(id INTEGER not NULL,name1 VARCHAR(255), PRIMARY KEY ( id ))"; stmt.executeUpdate(createPOJOTableNameDiff); } catch (Throwable e) { DTThrowable.rethrow(e); @@ -290,6 +292,46 @@ public class JdbcOperatorTest } + private static class TestPOJONonInsertOutputOperator extends JdbcPOJONonInsertOutputOperator + { + public TestPOJONonInsertOutputOperator() + { + cleanTable(); + } + + public int getNumOfEventsInStore() + { + Connection con; + try { + con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + + String countQuery = "SELECT count(*) from " + TABLE_POJO_NAME; + ResultSet resultSet = stmt.executeQuery(countQuery); + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException e) { + throw new RuntimeException("fetching count", e); + } + } + + public int getDistinctNonUnique() + { + Connection con; + try { + con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + + String countQuery = "SELECT count(distinct(name)) from " + TABLE_POJO_NAME; + ResultSet resultSet = stmt.executeQuery(countQuery); + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException e) { + throw new RuntimeException("fetching count", e); + } + } + } + private static class TestInputOperator extends AbstractJdbcInputOperator<TestEvent> { @@ -369,9 +411,9 @@ public class JdbcOperatorTest outputOperator.setBatchSize(3); outputOperator.setTablename(TABLE_POJO_NAME); - List<FieldInfo> fieldInfos = Lists.newArrayList(); - fieldInfos.add(new FieldInfo("ID", "id", null)); - fieldInfos.add(new FieldInfo("NAME", "name", null)); + List<JdbcFieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new JdbcFieldInfo("ID", "id", null, Types.INTEGER)); + fieldInfos.add(new JdbcFieldInfo("NAME", "name", null, Types.VARCHAR)); outputOperator.setFieldInfos(fieldInfos); outputOperator.setStore(transactionalStore); @@ -401,6 +443,7 @@ public class JdbcOperatorTest /** * This test will assume direct mapping for POJO fields to DB columns + * All fields in DB present in POJO */ @Test public void testJdbcPojoInsertOutputOperator() @@ -428,12 +471,21 @@ public class JdbcOperatorTest TestPortContext tpc = new TestPortContext(portAttributes); outputOperator.input.setup(tpc); + CollectorTestSink<Object> errorSink = new CollectorTestSink<>(); + TestUtils.setSink(outputOperator.error, errorSink); + outputOperator.activate(context); List<TestPOJOEvent> events = Lists.newArrayList(); for (int i = 0; i < 10; i++) { events.add(new TestPOJOEvent(i, "test" + i)); } + events.add(new TestPOJOEvent(0, "test0")); // Records violating PK constraint + events.add(new TestPOJOEvent(2, "test2")); // Records violating PK constraint + events.add(new TestPOJOEvent(10, "test10")); // Clean record + events.add(new TestPOJOEvent(11, "test11")); // Clean record + events.add(new TestPOJOEvent(3, "test3")); // Records violating PK constraint + events.add(new TestPOJOEvent(12, "test12")); // Clean record outputOperator.beginWindow(0); for (TestPOJOEvent event : events) { @@ -441,11 +493,15 @@ public class JdbcOperatorTest } outputOperator.endWindow(); - Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME)); + Assert.assertEquals("rows in db", 13, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME)); + Assert.assertEquals("Error tuples", 3, errorSink.collectedTuples.size()); } /** * This test will assume direct mapping for POJO fields to DB columns + * Nullable DB field missing in POJO + * name1 field, which is nullable in DB is missing from POJO + * POJO(id, name) -> DB(id, name1) */ @Test public void testJdbcPojoInsertOutputOperatorNullName() @@ -487,7 +543,108 @@ public class JdbcOperatorTest outputOperator.endWindow(); Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME_NAME_DIFF)); - Assert.assertEquals("null name rows in db", 10, outputOperator.getNumOfNullEventsInStore(TABLE_POJO_NAME_NAME_DIFF)); + Assert.assertEquals("null name rows in db", 10, + outputOperator.getNumOfNullEventsInStore(TABLE_POJO_NAME_NAME_DIFF)); + } + + /** + * This test will assume direct mapping for POJO fields to DB columns. + * Non-Nullable DB field missing in POJO + * id1 field which is non-nullable in DB is missing from POJO + * POJO(id, name) -> DB(id1, name) + */ + @Test + public void testJdbcPojoInsertOutputOperatorNullId() + { + JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore(); + transactionalStore.setDatabaseDriver(DB_DRIVER); + transactionalStore.setDatabaseUrl(URL); + + com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = + new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( + OPERATOR_ID, attributeMap); + + TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator(); + outputOperator.setBatchSize(3); + outputOperator.setTablename(TABLE_POJO_NAME_ID_DIFF); + + outputOperator.setStore(transactionalStore); + + outputOperator.setup(context); + + Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); + TestPortContext tpc = new TestPortContext(portAttributes); + outputOperator.input.setup(tpc); + + boolean exceptionOccurred = false; + try { + outputOperator.activate(context); + } catch (Exception e) { + exceptionOccurred = true; + Assert.assertTrue(e instanceof RuntimeException); + Assert.assertTrue(e.getMessage().toLowerCase().contains("id1 not found in pojo")); + } + Assert.assertTrue(exceptionOccurred); + } + + @Test + public void testJdbcPojoOutputOperatorMerge() + { + JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore(); + transactionalStore.setDatabaseDriver(DB_DRIVER); + transactionalStore.setDatabaseUrl(URL); + + com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = + new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( + OPERATOR_ID, attributeMap); + + TestPOJONonInsertOutputOperator updateOperator = new TestPOJONonInsertOutputOperator(); + updateOperator.setBatchSize(3); + + updateOperator.setStore(transactionalStore); + + updateOperator.setSqlStatement("MERGE INTO " + TABLE_POJO_NAME + " AS T USING (VALUES (?, ?)) AS FOO(id, name) " + + "ON T.id = FOO.id " + + "WHEN MATCHED THEN UPDATE SET name = FOO.name " + + "WHEN NOT MATCHED THEN INSERT( id, name ) VALUES (FOO.id, FOO.name);"); + + List<JdbcFieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new JdbcFieldInfo("id", "id", null, Types.INTEGER)); + fieldInfos.add(new JdbcFieldInfo("name", "name", null, Types.VARCHAR)); + updateOperator.setFieldInfos(fieldInfos); + updateOperator.setup(context); + + Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); + TestPortContext tpc = new TestPortContext(portAttributes); + updateOperator.input.setup(tpc); + + updateOperator.activate(context); + + List<TestPOJOEvent> events = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + events.add(new TestPOJOEvent(i, "test" + i)); + } + for (int i = 0; i < 5; i++) { + events.add(new TestPOJOEvent(i, "test" + 100)); + } + + updateOperator.getDistinctNonUnique(); + updateOperator.beginWindow(0); + for (TestPOJOEvent event : events) { + updateOperator.input.process(event); + } + updateOperator.endWindow(); + + // Expect 10 unique ids: 0 - 9 + Assert.assertEquals("rows in db", 10, updateOperator.getNumOfEventsInStore()); + // Expect 6 unique name: test-100, test-5, test-6, test-7, test-8, test-9 + Assert.assertEquals("rows in db", 6, updateOperator.getDistinctNonUnique()); } @Test
