Repository: apex-malhar Updated Branches: refs/heads/master b42d8e741 -> a37869e9a
APEXMALHAR-2513 fixes for JdbcPollOperator Fix result set extraction to use column index. Exit static partitions after work is done. Propagate all exceptions from poller task. Allow for override of DSLContext. Add comment regarding SQL dialect support. Fix poller example properties. Documentation fixes. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/a37869e9 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/a37869e9 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/a37869e9 Branch: refs/heads/master Commit: a37869e9a253226dcd9ba55014d6dbcd8d95a308 Parents: b42d8e7 Author: Thomas Weise <[email protected]> Authored: Sun Jul 2 17:00:34 2017 -0700 Committer: Thomas Weise <[email protected]> Committed: Thu Jul 6 10:28:01 2017 -0700 ---------------------------------------------------------------------- docs/operators/jdbcPollInputOperator.md | 23 ++-- .../META-INF/properties-PollJdbcToHDFSApp.xml | 24 ++-- .../db/jdbc/AbstractJdbcPollInputOperator.java | 109 +++++++++++-------- .../lib/db/jdbc/JdbcPollInputOperator.java | 5 +- .../db/jdbc/JdbcPojoPollableOpeartorTest.java | 11 +- 5 files changed, 98 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a37869e9/docs/operators/jdbcPollInputOperator.md ---------------------------------------------------------------------- diff --git a/docs/operators/jdbcPollInputOperator.md b/docs/operators/jdbcPollInputOperator.md index aa1d107..822ace5 100644 --- a/docs/operators/jdbcPollInputOperator.md +++ b/docs/operators/jdbcPollInputOperator.md @@ -26,10 +26,10 @@ JDBC Poller Input operator addresses the first issue with an asynchronous worker Assumption is that there is an ordered column using which range queries can be formed. That means database has a column or combination of columns which has unique constraint as well as every newly inserted record should have column value more than max value in that column, as we poll only appended records. ## Use cases -1. Scan huge database tables to either copy to other database or process it using **Apache Apex**. An example application using this operator to copy database contents to HDFS is available in the [examples repository](https://github.com/DataTorrent/examples/tree/master/tutorials/jdbcIngest). Look for "PollJdbcToHDFSApp" for example of this particular operator. +1. Ingest large database tables. An example application that copies database contents to HDFS is available [here](https://github.com/apache/apex-malhar/blob/master/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplication.java). ## How to Use? -The tuple type in the abstract class is a generic parameter. Concrete subclasses need to choose an appropriate class (such as String or an appropriate concrete java class, having no-argument constructor so that it can be serialized using kyro). Also implement a couple of abstract methods: `getTuple(ResultSet)` to convert database rows to objects of concrete class and `emitTuple(T)` to emit the tuple. +The tuple type in the abstract class is a generic parameter. Concrete subclasses need to choose an appropriate class (such as String or an appropriate concrete java class, having no-argument constructor so that it can be serialized using Kryo). Also implement a couple of abstract methods: `getTuple(ResultSet)` to convert database rows to objects of concrete class and `emitTuple(T)` to emit the tuple. In principle, no ports need be defined in the rare case that the operator simply writes tuples to some external sink or merely maintains aggregated statistics. But in most common scenarios, the tuples need to be sent to one or more downstream operators for additional processing such as parsing, enrichment or aggregation; in such cases, appropriate output ports are defined and the emitTuple(T) implementation dispatches tuples to the desired output ports. @@ -46,7 +46,7 @@ Only static partitioning is supported for JDBC Poller Input Operator. Configure ```xml <property> - <name>dt.operator.{OperatorName}.prop.partitionCount</name> + <name>apex.operator.{OperatorName}.prop.partitionCount</name> <value>4</value> </property> ``` @@ -62,8 +62,7 @@ Not supported. 1. Operator location: ***malhar-library*** 2. Available since: ***3.5.0*** 3. Operator state: ***Evolving*** -4. Java Packages: - * Operator: ***[com.datatorrent.lib.db.jdbc.AbstractJdbcPollInputOperator](https://www.datatorrent.com/docs/apidocs/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.html)*** +4. Java Packages: ***[AbstractJdbcPollInputOperator](https://ci.apache.org/projects/apex-malhar/apex-malhar-javadoc-release-3.7/com/datatorrent/lib/db/jdbc/package-summary.html)*** JDBC Poller is **idempotent**, **fault-tolerant** and **statically partitionable**. @@ -99,27 +98,27 @@ Of these only `store` properties, `tableName`, `columnsExpression` and `key` are ```xml <property> - <name>dt.operator.{OperatorName}.prop.tableName</name> + <name>apex.operator.{OperatorName}.prop.tableName</name> <value>mytable</value> </property> <property> - <name>dt.operator.{OperatorName}.prop.columnsExpression</name> + <name>apex.operator.{OperatorName}.prop.columnsExpression</name> <value>column1,column2,column4</value> </property> <property> - <name>dt.operator.{OperatorName}.prop.key</name> + <name>apex.operator.{OperatorName}.prop.key</name> <value>keycolumn</value> </property> <property> - <name>dt.operator.{OperatorName}.prop.store.databaseDriver</name> + <name>apex.operator.{OperatorName}.prop.store.databaseDriver</name> <value>com.mysql.jdbc.Driver</value> </property> <property> - <name>dt.operator.{OperatorName}.prop.store.databaseUrl</name> + <name>apex.operator.{OperatorName}.prop.store.databaseUrl</name> <value>jdbc:mysql://localhost:3306/mydb</value> </property> <property> - <name>dt.operator.{OperatorName}.prop.store.connectionProps</name> + <name>apex.operator.{OperatorName}.prop.store.connectionProps</name> <value>user:myuser,password:mypassword</value> </property> ``` @@ -147,7 +146,7 @@ This operator defines following additional properties beyond those defined in th | **Property** | **Description** | **Type** | **Mandatory** | **Default Value** | | -------- | ----------- | ---- | ------------------ | ------------- | -| *fieldInfos*| [FieldInfo](https://www.datatorrent.com/docs/apidocs/com/datatorrent/lib/util/FieldInfo.html) maps a store column to a POJO field name.| List | Yes | N/A | +| *fieldInfos*| Maps columns to POJO field names.| List | Yes | N/A | #### Platform Attributes that influence operator behavior | **Attribute** | **Description** | **Type** | **Mandatory** | http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a37869e9/examples/jdbc/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml b/examples/jdbc/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml index c75c7b6..e24c52e 100644 --- a/examples/jdbc/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml +++ b/examples/jdbc/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml @@ -23,68 +23,68 @@ <!-- Static partitioning, specify the partition count, this decides how many ranges would be initiated --> <property> - <name>dt.application.operator.JdbcPoller.prop.partitionCount</name> + <name>apex.operator.JdbcPoller.prop.partitionCount</name> <value>2</value> </property> <property> - <name>dt.application.operator.JdbcPoller.prop.store.databaseDriver</name> + <name>apex.operator.JdbcPoller.prop.store.databaseDriver</name> <!-- replace value with your jbdc driver --> <value>org.hsqldb.jdbcDriver</value> </property> <property> - <name>dt.application.operator.JdbcPoller.prop.store.databaseUrl</name> + <name>apex.operator.JdbcPoller.prop.store.databaseUrl</name> <!-- replace value with your jbdc url --> <value>jdbc:hsqldb:mem:test</value> </property> <!--property> - <name>dt.application.operator.JdbcPoller.prop.store.userName</name> + <name>apex.operator.JdbcPoller.prop.store.connectionProperties(user)</name> <value>username</value> </property> <property> - <name>dt.application.operator.JdbcPoller.prop.store.password</name> + <name>apex.operator.JdbcPoller.prop.store.connectionProperties(password)</name> <value>password</value> </property--> <!-- Batch size for poller --> <property> - <name>dt.application.operator.JdbcPoller.prop.batchSize</name> + <name>apex.operator.JdbcPoller.prop.batchSize</name> <value>50</value> </property> <!-- look-up key for forming range queries, this would be the column name on which the table is sorted --> <property> - <name>dt.application.operator.JdbcPoller.prop.key</name> + <name>apex.operator.JdbcPoller.prop.key</name> <value>ACCOUNT_NO</value> </property> <property> - <name>dt.application.operator.JdbcPoller.prop.columnsExpression</name> + <name>apex.operator.JdbcPoller.prop.columnsExpression</name> <value>ACCOUNT_NO,NAME,AMOUNT</value> </property> <property> - <name>dt.application.operator.JdbcPoller.port.outputPort.attr.TUPLE_CLASS</name> + <name>apex.operator.JdbcPoller.port.outputPort.attr.TUPLE_CLASS</name> <value>org.apache.apex.examples.JdbcIngest.PojoEvent</value> </property> <!-- Table name --> <property> - <name>dt.application.operator.JdbcPoller.prop.tableName</name> + <name>apex.operator.JdbcPoller.prop.tableName</name> <value>test_event_table</value> </property> <property> - <name>dt.application.operator.JdbcPoller.prop.pollInterval</name> + <name>apex.operator.JdbcPoller.prop.pollInterval</name> <value>1000</value> </property> <!-- Output folder for HDFS output operator --> <property> - <name>dt.application.operator.Writer.filePath</name> + <name>apex.operator.Writer.filePath</name> <value>/tmp/test/output</value> </property> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a37869e9/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java index 86a443c..504f7fa 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java @@ -30,8 +30,8 @@ import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; @@ -39,7 +39,6 @@ import javax.validation.constraints.NotNull; import org.jooq.Condition; import org.jooq.DSLContext; import org.jooq.Field; -import org.jooq.SelectField; import org.jooq.conf.ParamType; import org.jooq.impl.DSL; import org.jooq.tools.jdbc.JDBCUtils; @@ -60,10 +59,10 @@ import com.datatorrent.api.Operator.ActivationListener; import com.datatorrent.api.Partitioner; import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.common.util.BaseOperator; import com.datatorrent.lib.db.AbstractStoreInputOperator; import com.datatorrent.lib.util.KeyValPair; import com.datatorrent.lib.util.KryoCloneUtils; -import com.datatorrent.netlet.util.DTThrowable; import static java.sql.ResultSet.CONCUR_READ_ONLY; import static java.sql.ResultSet.TYPE_FORWARD_ONLY; @@ -83,10 +82,14 @@ import static org.jooq.impl.DSL.field; * Only newly added data will be fetched by the polling jdbc partition, also * assumption is rows won't be added or deleted in middle during scan. * + * The operator uses jOOQ to build the SQL queries based on the discovered {@link org.jooq.SQLDialect}. + * Note that some of the dialects (including Oracle) are only available in commercial + * jOOQ distributions. If the dialect is not available, a generic translation is applied, + * you can post-process the generated SQL by overriding {@link #buildRangeQuery(int, int)}. * * @displayName Jdbc Polling Input Operator * @category Input - * @tags database, sql, jdbc, partitionable, idepotent, pollable + * @tags database, sql, jdbc, partitionable, idempotent, pollable * * @since 3.5.0 */ @@ -115,7 +118,6 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu @NotNull private String tableName; - @NotNull private String columnsExpression; @NotNull private String key; @@ -126,11 +128,10 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu protected KeyValPair<Integer, Integer> rangeQueryPair; protected Integer lowerBound; protected Integer lastEmittedRow; - private transient int operatorId; - private transient DSLContext create; + protected transient DSLContext dslContext; private transient volatile boolean execute; private transient ScheduledExecutorService scanService; - private transient AtomicReference<Throwable> threadException; + private transient ScheduledFuture<?> pollFuture; protected transient boolean isPolled; protected transient LinkedBlockingDeque<T> emitQueue; protected transient PreparedStatement ps; @@ -150,19 +151,18 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu public void setup(OperatorContext context) { super.setup(context); - intializeDSLContext(); + dslContext = createDSLContext(); if (scanService == null) { scanService = Executors.newScheduledThreadPool(1); } execute = true; emitQueue = new LinkedBlockingDeque<>(queueCapacity); - operatorId = context.getId(); windowManager.setup(context); } - private void intializeDSLContext() + protected DSLContext createDSLContext() { - create = DSL.using(store.getConnection(), JDBCUtils.dialect(store.getDatabaseUrl())); + return DSL.using(store.getConnection(), JDBCUtils.dialect(store.getDatabaseUrl())); } @Override @@ -172,7 +172,17 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu long largestRecoveryWindow = windowManager.getLargestCompletedWindow(); if (largestRecoveryWindow == Stateless.WINDOW_ID || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow) { - scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS); + schedulePollTask(); + } + } + + private void schedulePollTask() + { + if (isPollerPartition) { + pollFuture = scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS); + } else { + LOG.debug("Scheduling for one time execution."); + pollFuture = scanService.schedule(new DBPoller(), 0, TimeUnit.MILLISECONDS); } } @@ -203,8 +213,7 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu replay(currentWindowId); return; } catch (SQLException e) { - LOG.error("Exception in replayed windows", e); - throw new RuntimeException(e); + throw new RuntimeException("Replay failed", e); } } if (isPollerPartition) { @@ -219,6 +228,7 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu if (currentWindowId <= windowManager.getLargestCompletedWindow()) { return; } + int pollSize = (emitQueue.size() < batchSize) ? emitQueue.size() : batchSize; while (pollSize-- > 0) { T obj = emitQueue.poll(); @@ -234,6 +244,21 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu @Override public void endWindow() { + if (pollFuture != null && (pollFuture.isCancelled() || pollFuture.isDone())) { + try { + pollFuture.get(); + } catch (Exception e) { + throw new RuntimeException("JDBC thread failed", e); + } + + if (isPollerPartition) { + throw new IllegalStateException("poller task terminated"); + } else { + // exit static query partition + BaseOperator.shutdown(); + } + } + try { if (currentWindowId > windowManager.getLargestCompletedWindow()) { currentWindowRecoveryState = new MutablePair<>(lowerBound, lastEmittedRow); @@ -242,22 +267,19 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu } catch (IOException e) { throw new RuntimeException("saving recovery", e); } - if (threadException != null) { - store.disconnect(); - DTThrowable.rethrow(threadException.get()); - } } @Override public void deactivate() { + execute = false; scanService.shutdownNow(); store.disconnect(); } /** - * Function to insert results of a query in emit Queue - * @param preparedStatement PreparedStatement to execute the query and store the results in emit Queue. + * Execute the query and transfer results to the emit queue. + * @param preparedStatement PreparedStatement to execute the query and fetch results. */ protected void insertDbDataInQueue(PreparedStatement preparedStatement) throws SQLException, InterruptedException { @@ -293,14 +315,12 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu insertDbDataInQueue(ps); } isPolled = true; - } catch (SQLException ex) { - execute = false; - threadException = new AtomicReference<Throwable>(ex); - } catch (InterruptedException e) { - threadException = new AtomicReference<Throwable>(e); + } catch (SQLException | InterruptedException ex) { + throw new RuntimeException(ex); } finally { if (!isPollerPartition) { - store.disconnect(); + LOG.debug("fetched all records, marking complete."); + execute = false; } } isPolled = true; @@ -310,7 +330,6 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu protected void replay(long windowId) throws SQLException { - try { @SuppressWarnings("unchecked") MutablePair<Integer, Integer> recoveredData = (MutablePair<Integer, Integer>)windowManager.retrieve(windowId); @@ -318,14 +337,11 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu if (recoveredData != null && shouldReplayWindow(recoveredData)) { LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", windowId, recoveredData.left, recoveredData.right); - ps = store.getConnection().prepareStatement( buildRangeQuery(recoveredData.left, (recoveredData.right - recoveredData.left)), TYPE_FORWARD_ONLY, CONCUR_READ_ONLY); LOG.info("Query formed to recover data - {}", ps.toString()); - emitReplayedTuples(ps); - } if (currentWindowId == windowManager.getLargestCompletedWindow()) { @@ -347,7 +363,7 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu lastOffset = bound; } } - scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS); + schedulePollTask(); } catch (SQLException e) { throw new RuntimeException(e); } @@ -400,13 +416,13 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu public Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> definePartitions( Collection<Partition<AbstractJdbcPollInputOperator<T>>> partitions, PartitioningContext context) { - List<Partition<AbstractJdbcPollInputOperator<T>>> newPartitions = new ArrayList<Partition<AbstractJdbcPollInputOperator<T>>>( + List<Partition<AbstractJdbcPollInputOperator<T>>> newPartitions = new ArrayList<>( getPartitionCount()); HashMap<Integer, KeyValPair<Integer, Integer>> partitionToRangeMap = null; try { store.connect(); - intializeDSLContext(); + dslContext = createDSLContext(); partitionToRangeMap = getPartitionedQueryRangeMap(getPartitionCount()); } catch (SQLException e) { LOG.error("Exception in initializing the partition range", e); @@ -427,11 +443,11 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu } else { // The upper bound for the n+1 partition is set to null since its a pollable partition int partitionKey = partitionToRangeMap.get(i - 1).getValue(); - jdbcPoller.rangeQueryPair = new KeyValPair<Integer, Integer>(partitionKey, null); + jdbcPoller.rangeQueryPair = new KeyValPair<>(partitionKey, null); jdbcPoller.lastEmittedRow = partitionKey; jdbcPoller.isPollerPartition = true; } - newPartitions.add(new DefaultPartition<AbstractJdbcPollInputOperator<T>>(jdbcPoller)); + newPartitions.add(new DefaultPartition<>(jdbcPoller)); } return newPartitions; @@ -457,10 +473,10 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu HashMap<Integer, KeyValPair<Integer, Integer>> partitionToQueryMap = new HashMap<>(); int events = (rowCount / partitions); for (int i = 0, lowerOffset = 0, upperOffset = events; i < partitions - 1; i++, lowerOffset += events, upperOffset += events) { - partitionToQueryMap.put(i, new KeyValPair<Integer, Integer>(lowerOffset, upperOffset)); + partitionToQueryMap.put(i, new KeyValPair<>(lowerOffset, upperOffset)); } - partitionToQueryMap.put(partitions - 1, new KeyValPair<Integer, Integer>(events * (partitions - 1), (int)rowCount)); + partitionToQueryMap.put(partitions - 1, new KeyValPair<>(events * (partitions - 1), rowCount)); LOG.info("Partition map - " + partitionToQueryMap.toString()); return partitionToQueryMap; } @@ -476,7 +492,7 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu if (getWhereCondition() != null) { condition = condition.and(getWhereCondition()); } - int recordsCount = create.select(DSL.count()).from(getTableName()).where(condition).fetchOne(0, int.class); + int recordsCount = dslContext.select(DSL.count()).from(getTableName()).where(condition).fetchOne(0, int.class); return recordsCount; } @@ -496,10 +512,10 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu for (String column : getColumnsExpression().split(",")) { columns.add(field(column)); } - sqlQuery = create.select((Collection<? extends SelectField<?>>)columns).from(getTableName()).where(condition) + sqlQuery = dslContext.select(columns).from(getTableName()).where(condition) .orderBy(field(getKey())).limit(limit).offset(offset).getSQL(ParamType.INLINED); } else { - sqlQuery = create.select().from(getTableName()).where(condition).orderBy(field(getKey())).limit(limit) + sqlQuery = dslContext.select().from(getTableName()).where(condition).orderBy(field(getKey())).limit(limit) .offset(offset).getSQL(ParamType.INLINED); } LOG.info("DSL Query: " + sqlQuery); @@ -515,10 +531,15 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu @Override public void run() { - while (execute) { - if ((isPollerPartition && !isPolled) || !isPollerPartition) { - pollRecords(); + try { + LOG.debug("Entering poll task"); + while (execute) { + if ((isPollerPartition && !isPolled) || !isPollerPartition) { + pollRecords(); + } } + } finally { + LOG.debug("Exiting poll task"); } } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a37869e9/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java index 9a76103..eb72431 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java @@ -68,8 +68,9 @@ public class JdbcPollInputOperator extends AbstractJdbcPollInputOperator<String> { StringBuilder resultTuple = new StringBuilder(); try { - for (String obj : emitColumns) { - resultTuple.append(rs.getObject(obj) + ","); + int columnCount = rs.getMetaData().getColumnCount(); + for (int i = 0; i < columnCount; i++) { + resultTuple.append(rs.getObject(i + 1) + ","); } return resultTuple.substring(0, resultTuple.length() - 1); //remove last comma } catch (SQLException e) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a37869e9/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java index bde22f5..c01804f 100644 --- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java @@ -87,7 +87,7 @@ public class JdbcPojoPollableOpeartorTest extends JdbcOperatorTest } @Test - public void testDBPoller() throws InterruptedException + public void testDBPoller() throws Exception { insertEvents(10, true, 0); @@ -179,7 +179,7 @@ public class JdbcPojoPollableOpeartorTest extends JdbcOperatorTest { int operatorId = 1; when(windowDataManagerMock.getLargestCompletedWindow()).thenReturn(1L); - when(windowDataManagerMock.retrieve(1)).thenReturn(new MutablePair<Integer, Integer>(0, 4)); + when(windowDataManagerMock.retrieve(1)).thenReturn(new MutablePair<>(0, 4)); insertEvents(10, true, 0); @@ -207,7 +207,8 @@ public class JdbcPojoPollableOpeartorTest extends JdbcOperatorTest inputOperator.setFetchSize(100); inputOperator.setBatchSize(100); inputOperator.lastEmittedRow = 0; //setting as not calling partition logic - inputOperator.rangeQueryPair = new KeyValPair<Integer, Integer>(0, 8); + inputOperator.isPollerPartition = true; + inputOperator.rangeQueryPair = new KeyValPair<>(0, 8); inputOperator.outputPort.setup(tpc); inputOperator.setScheduledExecutorService(mockscheduler); @@ -219,15 +220,17 @@ public class JdbcPojoPollableOpeartorTest extends JdbcOperatorTest inputOperator.outputPort.setSink(sink); inputOperator.beginWindow(0); verify(mockscheduler, times(0)).scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); + verify(mockscheduler, times(0)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)); inputOperator.emitTuples(); inputOperator.endWindow(); inputOperator.beginWindow(1); verify(mockscheduler, times(1)).scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); + verify(mockscheduler, times(0)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)); } @Test - public void testDBPollerExtraField() throws InterruptedException + public void testDBPollerExtraField() throws Exception { insertEvents(10, true, 0);
