Repository: apex-malhar
Updated Branches:
  refs/heads/master b42d8e741 -> a37869e9a


APEXMALHAR-2513 fixes for JdbcPollOperator

Fix result set extraction to use column index.
Exit static partitions after work is done.
Propagate all exceptions from poller task.
Allow for override of DSLContext.
Add comment regarding SQL dialect support.
Fix poller example properties.
Documentation fixes.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/a37869e9
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/a37869e9
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/a37869e9

Branch: refs/heads/master
Commit: a37869e9a253226dcd9ba55014d6dbcd8d95a308
Parents: b42d8e7
Author: Thomas Weise <[email protected]>
Authored: Sun Jul 2 17:00:34 2017 -0700
Committer: Thomas Weise <[email protected]>
Committed: Thu Jul 6 10:28:01 2017 -0700

----------------------------------------------------------------------
 docs/operators/jdbcPollInputOperator.md         |  23 ++--
 .../META-INF/properties-PollJdbcToHDFSApp.xml   |  24 ++--
 .../db/jdbc/AbstractJdbcPollInputOperator.java  | 109 +++++++++++--------
 .../lib/db/jdbc/JdbcPollInputOperator.java      |   5 +-
 .../db/jdbc/JdbcPojoPollableOpeartorTest.java   |  11 +-
 5 files changed, 98 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a37869e9/docs/operators/jdbcPollInputOperator.md
----------------------------------------------------------------------
diff --git a/docs/operators/jdbcPollInputOperator.md 
b/docs/operators/jdbcPollInputOperator.md
index aa1d107..822ace5 100644
--- a/docs/operators/jdbcPollInputOperator.md
+++ b/docs/operators/jdbcPollInputOperator.md
@@ -26,10 +26,10 @@ JDBC Poller Input operator addresses the first issue with 
an asynchronous worker
 Assumption is that there is an ordered column using which range queries can be 
formed. That means database has a column or combination of columns which has 
unique constraint as well as every newly inserted record should have column 
value more than max value in that column, as we poll only appended records.
 
 ## Use cases
-1. Scan huge database tables to either copy to other database or process it 
using **Apache Apex**. An example application using this operator to copy 
database contents to HDFS is available in the [examples 
repository](https://github.com/DataTorrent/examples/tree/master/tutorials/jdbcIngest).
 Look for "PollJdbcToHDFSApp" for example of this particular operator.
+1. Ingest large database tables. An example application that copies database 
contents to HDFS is available 
[here](https://github.com/apache/apex-malhar/blob/master/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplication.java).
 
 ## How to Use?
-The tuple type in the abstract class is a generic parameter. Concrete 
subclasses need to choose an appropriate class (such as String or an 
appropriate concrete java class, having no-argument constructor so that it can 
be serialized using kyro). Also implement a couple of abstract methods: 
`getTuple(ResultSet)` to convert database rows to objects of concrete class and 
`emitTuple(T)` to emit the tuple.
+The tuple type in the abstract class is a generic parameter. Concrete 
subclasses need to choose an appropriate class (such as String or an 
appropriate concrete java class, having no-argument constructor so that it can 
be serialized using Kryo). Also implement a couple of abstract methods: 
`getTuple(ResultSet)` to convert database rows to objects of concrete class and 
`emitTuple(T)` to emit the tuple.
 
 In principle, no ports need be defined in the rare case that the operator 
simply writes tuples to some external sink or merely maintains aggregated 
statistics. But in most common scenarios, the tuples need to be sent to one or 
more downstream operators for additional processing such as parsing, enrichment 
or aggregation; in such cases, appropriate output ports are defined and the 
emitTuple(T) implementation dispatches tuples to the desired output ports.
 
@@ -46,7 +46,7 @@ Only static partitioning is supported for JDBC Poller Input 
Operator. Configure
 
 ```xml
   <property>
-    <name>dt.operator.{OperatorName}.prop.partitionCount</name>
+    <name>apex.operator.{OperatorName}.prop.partitionCount</name>
     <value>4</value>
   </property>
 ```
@@ -62,8 +62,7 @@ Not supported.
 1. Operator location: ***malhar-library***
 2. Available since: ***3.5.0***
 3. Operator state: ***Evolving***
-4. Java Packages:
-    * Operator: 
***[com.datatorrent.lib.db.jdbc.AbstractJdbcPollInputOperator](https://www.datatorrent.com/docs/apidocs/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.html)***
+4. Java Packages: 
***[AbstractJdbcPollInputOperator](https://ci.apache.org/projects/apex-malhar/apex-malhar-javadoc-release-3.7/com/datatorrent/lib/db/jdbc/package-summary.html)***
 
 JDBC Poller is **idempotent**, **fault-tolerant** and **statically 
partitionable**.
 
@@ -99,27 +98,27 @@ Of these only `store` properties, `tableName`, 
`columnsExpression` and `key` are
 
 ```xml
 <property>
-  <name>dt.operator.{OperatorName}.prop.tableName</name>
+  <name>apex.operator.{OperatorName}.prop.tableName</name>
   <value>mytable</value>
 </property>
 <property>
-  <name>dt.operator.{OperatorName}.prop.columnsExpression</name>
+  <name>apex.operator.{OperatorName}.prop.columnsExpression</name>
   <value>column1,column2,column4</value>
 </property>
 <property>
-  <name>dt.operator.{OperatorName}.prop.key</name>
+  <name>apex.operator.{OperatorName}.prop.key</name>
   <value>keycolumn</value>
 </property>
 <property>
-  <name>dt.operator.{OperatorName}.prop.store.databaseDriver</name>
+  <name>apex.operator.{OperatorName}.prop.store.databaseDriver</name>
   <value>com.mysql.jdbc.Driver</value>
 </property>
 <property>
-  <name>dt.operator.{OperatorName}.prop.store.databaseUrl</name>
+  <name>apex.operator.{OperatorName}.prop.store.databaseUrl</name>
   <value>jdbc:mysql://localhost:3306/mydb</value>
 </property>
 <property>
-  <name>dt.operator.{OperatorName}.prop.store.connectionProps</name>
+  <name>apex.operator.{OperatorName}.prop.store.connectionProps</name>
   <value>user:myuser,password:mypassword</value>
 </property>
 ```
@@ -147,7 +146,7 @@ This operator defines following additional properties 
beyond those defined in th
 
 | **Property** | **Description** | **Type** | **Mandatory** | **Default 
Value** |
 | -------- | ----------- | ---- | ------------------ | ------------- |
-| *fieldInfos*| 
[FieldInfo](https://www.datatorrent.com/docs/apidocs/com/datatorrent/lib/util/FieldInfo.html)
 maps a store column to a POJO field name.| List | Yes | N/A |
+| *fieldInfos*| Maps columns to POJO field names.| List | Yes | N/A |
 
 #### Platform Attributes that influence operator behavior
 | **Attribute** | **Description** | **Type** | **Mandatory** |

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a37869e9/examples/jdbc/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml
----------------------------------------------------------------------
diff --git 
a/examples/jdbc/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml 
b/examples/jdbc/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml
index c75c7b6..e24c52e 100644
--- a/examples/jdbc/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml
+++ b/examples/jdbc/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml
@@ -23,68 +23,68 @@
     <!-- Static partitioning, specify the partition count, this decides how 
         many ranges would be initiated -->
     <property>
-        <name>dt.application.operator.JdbcPoller.prop.partitionCount</name>
+        <name>apex.operator.JdbcPoller.prop.partitionCount</name>
         <value>2</value>
     </property>
 
     <property>
-        
<name>dt.application.operator.JdbcPoller.prop.store.databaseDriver</name>
+        <name>apex.operator.JdbcPoller.prop.store.databaseDriver</name>
         <!-- replace value with your jbdc driver -->
         <value>org.hsqldb.jdbcDriver</value>
     </property>
 
     <property>
-        <name>dt.application.operator.JdbcPoller.prop.store.databaseUrl</name>
+        <name>apex.operator.JdbcPoller.prop.store.databaseUrl</name>
         <!-- replace value with your jbdc  url -->
         <value>jdbc:hsqldb:mem:test</value>
     </property>
 
     <!--property>
-        <name>dt.application.operator.JdbcPoller.prop.store.userName</name>
+        
<name>apex.operator.JdbcPoller.prop.store.connectionProperties(user)</name>
         <value>username</value>
     </property>
     
     <property>
-        <name>dt.application.operator.JdbcPoller.prop.store.password</name>
+        
<name>apex.operator.JdbcPoller.prop.store.connectionProperties(password)</name>
         <value>password</value>
     </property-->
 
     <!-- Batch size for poller -->
     <property>
-        <name>dt.application.operator.JdbcPoller.prop.batchSize</name>
+        <name>apex.operator.JdbcPoller.prop.batchSize</name>
         <value>50</value>
     </property>
 
     <!-- look-up key for forming range queries, this would be the column name 
         on which the table is sorted -->
     <property>
-        <name>dt.application.operator.JdbcPoller.prop.key</name>
+        <name>apex.operator.JdbcPoller.prop.key</name>
         <value>ACCOUNT_NO</value>
     </property>
 
     <property>
-        <name>dt.application.operator.JdbcPoller.prop.columnsExpression</name>
+        <name>apex.operator.JdbcPoller.prop.columnsExpression</name>
         <value>ACCOUNT_NO,NAME,AMOUNT</value>
     </property>
     <property>
-      
<name>dt.application.operator.JdbcPoller.port.outputPort.attr.TUPLE_CLASS</name>
+      <name>apex.operator.JdbcPoller.port.outputPort.attr.TUPLE_CLASS</name>
       <value>org.apache.apex.examples.JdbcIngest.PojoEvent</value>
     </property>
 
     <!-- Table name -->
     <property>
-        <name>dt.application.operator.JdbcPoller.prop.tableName</name>
+        <name>apex.operator.JdbcPoller.prop.tableName</name>
         <value>test_event_table</value>
     </property>
 
     <property>
-        <name>dt.application.operator.JdbcPoller.prop.pollInterval</name>
+        <name>apex.operator.JdbcPoller.prop.pollInterval</name>
         <value>1000</value>
     </property>
 
     <!-- Output folder for HDFS output operator -->
     <property>
-        <name>dt.application.operator.Writer.filePath</name>
+        <name>apex.operator.Writer.filePath</name>
         <value>/tmp/test/output</value>
     </property>
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a37869e9/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
index 86a443c..504f7fa 100644
--- 
a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
@@ -30,8 +30,8 @@ import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
@@ -39,7 +39,6 @@ import javax.validation.constraints.NotNull;
 import org.jooq.Condition;
 import org.jooq.DSLContext;
 import org.jooq.Field;
-import org.jooq.SelectField;
 import org.jooq.conf.ParamType;
 import org.jooq.impl.DSL;
 import org.jooq.tools.jdbc.JDBCUtils;
@@ -60,10 +59,10 @@ import com.datatorrent.api.Operator.ActivationListener;
 import com.datatorrent.api.Partitioner;
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.lib.db.AbstractStoreInputOperator;
 import com.datatorrent.lib.util.KeyValPair;
 import com.datatorrent.lib.util.KryoCloneUtils;
-import com.datatorrent.netlet.util.DTThrowable;
 
 import static java.sql.ResultSet.CONCUR_READ_ONLY;
 import static java.sql.ResultSet.TYPE_FORWARD_ONLY;
@@ -83,10 +82,14 @@ import static org.jooq.impl.DSL.field;
  * Only newly added data will be fetched by the polling jdbc partition, also
  * assumption is rows won't be added or deleted in middle during scan.
  *
+ * The operator uses jOOQ to build the SQL queries based on the discovered 
{@link org.jooq.SQLDialect}.
+ * Note that some of the dialects (including Oracle) are only available in 
commercial
+ * jOOQ distributions. If the dialect is not available, a generic translation 
is applied,
+ * you can post-process the generated SQL by overriding {@link 
#buildRangeQuery(int, int)}.
  *
  * @displayName Jdbc Polling Input Operator
  * @category Input
- * @tags database, sql, jdbc, partitionable, idepotent, pollable
+ * @tags database, sql, jdbc, partitionable, idempotent, pollable
  *
  * @since 3.5.0
  */
@@ -115,7 +118,6 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
 
   @NotNull
   private String tableName;
-  @NotNull
   private String columnsExpression;
   @NotNull
   private String key;
@@ -126,11 +128,10 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
   protected KeyValPair<Integer, Integer> rangeQueryPair;
   protected Integer lowerBound;
   protected Integer lastEmittedRow;
-  private transient int operatorId;
-  private transient DSLContext create;
+  protected transient DSLContext dslContext;
   private transient volatile boolean execute;
   private transient ScheduledExecutorService scanService;
-  private transient AtomicReference<Throwable> threadException;
+  private transient ScheduledFuture<?> pollFuture;
   protected transient boolean isPolled;
   protected transient LinkedBlockingDeque<T> emitQueue;
   protected transient PreparedStatement ps;
@@ -150,19 +151,18 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
   public void setup(OperatorContext context)
   {
     super.setup(context);
-    intializeDSLContext();
+    dslContext = createDSLContext();
     if (scanService == null) {
       scanService = Executors.newScheduledThreadPool(1);
     }
     execute = true;
     emitQueue = new LinkedBlockingDeque<>(queueCapacity);
-    operatorId = context.getId();
     windowManager.setup(context);
   }
 
-  private void intializeDSLContext()
+  protected DSLContext createDSLContext()
   {
-    create = DSL.using(store.getConnection(), 
JDBCUtils.dialect(store.getDatabaseUrl()));
+    return DSL.using(store.getConnection(), 
JDBCUtils.dialect(store.getDatabaseUrl()));
   }
 
   @Override
@@ -172,7 +172,17 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
     long largestRecoveryWindow = windowManager.getLargestCompletedWindow();
     if (largestRecoveryWindow == Stateless.WINDOW_ID
         || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > 
largestRecoveryWindow) {
-      scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, 
TimeUnit.MILLISECONDS);
+      schedulePollTask();
+    }
+  }
+
+  private void schedulePollTask()
+  {
+    if (isPollerPartition) {
+      pollFuture = scanService.scheduleAtFixedRate(new DBPoller(), 0, 
pollInterval, TimeUnit.MILLISECONDS);
+    } else {
+      LOG.debug("Scheduling for one time execution.");
+      pollFuture = scanService.schedule(new DBPoller(), 0, 
TimeUnit.MILLISECONDS);
     }
   }
 
@@ -203,8 +213,7 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
         replay(currentWindowId);
         return;
       } catch (SQLException e) {
-        LOG.error("Exception in replayed windows", e);
-        throw new RuntimeException(e);
+        throw new RuntimeException("Replay failed", e);
       }
     }
     if (isPollerPartition) {
@@ -219,6 +228,7 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
     if (currentWindowId <= windowManager.getLargestCompletedWindow()) {
       return;
     }
+
     int pollSize = (emitQueue.size() < batchSize) ? emitQueue.size() : 
batchSize;
     while (pollSize-- > 0) {
       T obj = emitQueue.poll();
@@ -234,6 +244,21 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
   @Override
   public void endWindow()
   {
+    if (pollFuture != null && (pollFuture.isCancelled() || 
pollFuture.isDone())) {
+      try {
+        pollFuture.get();
+      } catch (Exception e) {
+        throw new RuntimeException("JDBC thread failed", e);
+      }
+
+      if (isPollerPartition) {
+        throw new IllegalStateException("poller task terminated");
+      } else {
+        // exit static query partition
+        BaseOperator.shutdown();
+      }
+    }
+
     try {
       if (currentWindowId > windowManager.getLargestCompletedWindow()) {
         currentWindowRecoveryState = new MutablePair<>(lowerBound, 
lastEmittedRow);
@@ -242,22 +267,19 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
     } catch (IOException e) {
       throw new RuntimeException("saving recovery", e);
     }
-    if (threadException != null) {
-      store.disconnect();
-      DTThrowable.rethrow(threadException.get());
-    }
   }
 
   @Override
   public void deactivate()
   {
+    execute = false;
     scanService.shutdownNow();
     store.disconnect();
   }
 
   /**
-   * Function to insert results of a query in emit Queue
-   * @param preparedStatement PreparedStatement to execute the query and store 
the results in emit Queue.
+   * Execute the query and transfer results to the emit queue.
+   * @param preparedStatement PreparedStatement to execute the query and fetch 
results.
    */
   protected void insertDbDataInQueue(PreparedStatement preparedStatement) 
throws SQLException, InterruptedException
   {
@@ -293,14 +315,12 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
         insertDbDataInQueue(ps);
       }
       isPolled = true;
-    } catch (SQLException ex) {
-      execute = false;
-      threadException = new AtomicReference<Throwable>(ex);
-    } catch (InterruptedException e) {
-      threadException = new AtomicReference<Throwable>(e);
+    } catch (SQLException | InterruptedException ex) {
+      throw new RuntimeException(ex);
     } finally {
       if (!isPollerPartition) {
-        store.disconnect();
+        LOG.debug("fetched all records, marking complete.");
+        execute = false;
       }
     }
     isPolled = true;
@@ -310,7 +330,6 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
 
   protected void replay(long windowId) throws SQLException
   {
-
     try {
       @SuppressWarnings("unchecked")
       MutablePair<Integer, Integer> recoveredData = (MutablePair<Integer, 
Integer>)windowManager.retrieve(windowId);
@@ -318,14 +337,11 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
       if (recoveredData != null && shouldReplayWindow(recoveredData)) {
         LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", 
windowId, recoveredData.left,
             recoveredData.right);
-
         ps = store.getConnection().prepareStatement(
             buildRangeQuery(recoveredData.left, (recoveredData.right - 
recoveredData.left)), TYPE_FORWARD_ONLY,
             CONCUR_READ_ONLY);
         LOG.info("Query formed to recover data - {}", ps.toString());
-
         emitReplayedTuples(ps);
-
       }
 
       if (currentWindowId == windowManager.getLargestCompletedWindow()) {
@@ -347,7 +363,7 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
               lastOffset = bound;
             }
           }
-          scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, 
TimeUnit.MILLISECONDS);
+          schedulePollTask();
         } catch (SQLException e) {
           throw new RuntimeException(e);
         }
@@ -400,13 +416,13 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
   public 
Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>>
 definePartitions(
       Collection<Partition<AbstractJdbcPollInputOperator<T>>> partitions, 
PartitioningContext context)
   {
-    List<Partition<AbstractJdbcPollInputOperator<T>>> newPartitions = new 
ArrayList<Partition<AbstractJdbcPollInputOperator<T>>>(
+    List<Partition<AbstractJdbcPollInputOperator<T>>> newPartitions = new 
ArrayList<>(
         getPartitionCount());
 
     HashMap<Integer, KeyValPair<Integer, Integer>> partitionToRangeMap = null;
     try {
       store.connect();
-      intializeDSLContext();
+      dslContext = createDSLContext();
       partitionToRangeMap = getPartitionedQueryRangeMap(getPartitionCount());
     } catch (SQLException e) {
       LOG.error("Exception in initializing the partition range", e);
@@ -427,11 +443,11 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
       } else {
         // The upper bound for the n+1 partition is set to null since its a 
pollable partition
         int partitionKey = partitionToRangeMap.get(i - 1).getValue();
-        jdbcPoller.rangeQueryPair = new KeyValPair<Integer, 
Integer>(partitionKey, null);
+        jdbcPoller.rangeQueryPair = new KeyValPair<>(partitionKey, null);
         jdbcPoller.lastEmittedRow = partitionKey;
         jdbcPoller.isPollerPartition = true;
       }
-      newPartitions.add(new 
DefaultPartition<AbstractJdbcPollInputOperator<T>>(jdbcPoller));
+      newPartitions.add(new DefaultPartition<>(jdbcPoller));
     }
 
     return newPartitions;
@@ -457,10 +473,10 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
     HashMap<Integer, KeyValPair<Integer, Integer>> partitionToQueryMap = new 
HashMap<>();
     int events = (rowCount / partitions);
     for (int i = 0, lowerOffset = 0, upperOffset = events; i < partitions - 1; 
i++, lowerOffset += events, upperOffset += events) {
-      partitionToQueryMap.put(i, new KeyValPair<Integer, Integer>(lowerOffset, 
upperOffset));
+      partitionToQueryMap.put(i, new KeyValPair<>(lowerOffset, upperOffset));
     }
 
-    partitionToQueryMap.put(partitions - 1, new KeyValPair<Integer, 
Integer>(events * (partitions - 1), (int)rowCount));
+    partitionToQueryMap.put(partitions - 1, new KeyValPair<>(events * 
(partitions - 1), rowCount));
     LOG.info("Partition map - " + partitionToQueryMap.toString());
     return partitionToQueryMap;
   }
@@ -476,7 +492,7 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
     if (getWhereCondition() != null) {
       condition = condition.and(getWhereCondition());
     }
-    int recordsCount = 
create.select(DSL.count()).from(getTableName()).where(condition).fetchOne(0, 
int.class);
+    int recordsCount = 
dslContext.select(DSL.count()).from(getTableName()).where(condition).fetchOne(0,
 int.class);
     return recordsCount;
   }
 
@@ -496,10 +512,10 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
       for (String column : getColumnsExpression().split(",")) {
         columns.add(field(column));
       }
-      sqlQuery = create.select((Collection<? extends 
SelectField<?>>)columns).from(getTableName()).where(condition)
+      sqlQuery = 
dslContext.select(columns).from(getTableName()).where(condition)
           
.orderBy(field(getKey())).limit(limit).offset(offset).getSQL(ParamType.INLINED);
     } else {
-      sqlQuery = 
create.select().from(getTableName()).where(condition).orderBy(field(getKey())).limit(limit)
+      sqlQuery = 
dslContext.select().from(getTableName()).where(condition).orderBy(field(getKey())).limit(limit)
           .offset(offset).getSQL(ParamType.INLINED);
     }
     LOG.info("DSL Query: " + sqlQuery);
@@ -515,10 +531,15 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
     @Override
     public void run()
     {
-      while (execute) {
-        if ((isPollerPartition && !isPolled) || !isPollerPartition) {
-          pollRecords();
+      try {
+        LOG.debug("Entering poll task");
+        while (execute) {
+          if ((isPollerPartition && !isPolled) || !isPollerPartition) {
+            pollRecords();
+          }
         }
+      } finally {
+        LOG.debug("Exiting poll task");
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a37869e9/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java 
b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
index 9a76103..eb72431 100644
--- 
a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
@@ -68,8 +68,9 @@ public class JdbcPollInputOperator extends 
AbstractJdbcPollInputOperator<String>
   {
     StringBuilder resultTuple = new StringBuilder();
     try {
-      for (String obj : emitColumns) {
-        resultTuple.append(rs.getObject(obj) + ",");
+      int columnCount = rs.getMetaData().getColumnCount();
+      for (int i = 0; i < columnCount; i++) {
+        resultTuple.append(rs.getObject(i + 1) + ",");
       }
       return resultTuple.substring(0, resultTuple.length() - 1); //remove last 
comma
     } catch (SQLException e) {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a37869e9/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
 
b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
index bde22f5..c01804f 100644
--- 
a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
+++ 
b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
@@ -87,7 +87,7 @@ public class JdbcPojoPollableOpeartorTest extends 
JdbcOperatorTest
   }
 
   @Test
-  public void testDBPoller() throws InterruptedException
+  public void testDBPoller() throws Exception
   {
     insertEvents(10, true, 0);
 
@@ -179,7 +179,7 @@ public class JdbcPojoPollableOpeartorTest extends 
JdbcOperatorTest
   {
     int operatorId = 1;
     when(windowDataManagerMock.getLargestCompletedWindow()).thenReturn(1L);
-    when(windowDataManagerMock.retrieve(1)).thenReturn(new 
MutablePair<Integer, Integer>(0, 4));
+    when(windowDataManagerMock.retrieve(1)).thenReturn(new MutablePair<>(0, 
4));
 
     insertEvents(10, true, 0);
 
@@ -207,7 +207,8 @@ public class JdbcPojoPollableOpeartorTest extends 
JdbcOperatorTest
     inputOperator.setFetchSize(100);
     inputOperator.setBatchSize(100);
     inputOperator.lastEmittedRow = 0; //setting as not calling partition logic
-    inputOperator.rangeQueryPair = new KeyValPair<Integer, Integer>(0, 8);
+    inputOperator.isPollerPartition = true;
+    inputOperator.rangeQueryPair = new KeyValPair<>(0, 8);
 
     inputOperator.outputPort.setup(tpc);
     inputOperator.setScheduledExecutorService(mockscheduler);
@@ -219,15 +220,17 @@ public class JdbcPojoPollableOpeartorTest extends 
JdbcOperatorTest
     inputOperator.outputPort.setSink(sink);
     inputOperator.beginWindow(0);
     verify(mockscheduler, times(0)).scheduleAtFixedRate(any(Runnable.class), 
anyLong(), anyLong(), any(TimeUnit.class));
+    verify(mockscheduler, times(0)).schedule(any(Runnable.class), anyLong(), 
any(TimeUnit.class));
     inputOperator.emitTuples();
     inputOperator.endWindow();
     inputOperator.beginWindow(1);
     verify(mockscheduler, times(1)).scheduleAtFixedRate(any(Runnable.class), 
anyLong(), anyLong(), any(TimeUnit.class));
+    verify(mockscheduler, times(0)).schedule(any(Runnable.class), anyLong(), 
any(TimeUnit.class));
 
   }
 
   @Test
-  public void testDBPollerExtraField() throws InterruptedException
+  public void testDBPollerExtraField() throws Exception
   {
     insertEvents(10, true, 0);
 

Reply via email to