Repository: apex-malhar Updated Branches: refs/heads/master 1ae14c03a -> cf896b055
APEXMALHAR-2368 code changes to optimize memory usage in JdbcPoll operator Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/cf896b05 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/cf896b05 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/cf896b05 Branch: refs/heads/master Commit: cf896b055723786bfe610fe1679408db07943a0d Parents: 1ae14c0 Author: Hitesh-Scorpio <[email protected]> Authored: Tue Dec 13 19:39:43 2016 +0530 Committer: Hitesh-Scorpio <[email protected]> Committed: Mon Jan 23 14:08:23 2017 +0530 ---------------------------------------------------------------------- .../db/jdbc/AbstractJdbcPollInputOperator.java | 90 +++++++++++++------- 1 file changed, 60 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/cf896b05/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java index 6bd5121..86a443c 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java @@ -100,9 +100,14 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu private static int DEFAULT_FETCH_SIZE = 20000; private static int DEFAULT_BATCH_SIZE = 2000; private static int DEFAULT_SLEEP_TIME = 100; + private static int DEFAULT_RESULT_LIMIT = 100000; private int pollInterval = DEFAULT_POLL_INTERVAL; //in miliseconds private int queueCapacity = DEFAULT_QUEUE_CAPACITY; private int fetchSize = DEFAULT_FETCH_SIZE; + /** + * Parameter to limit the number of results to fetch in one query by the Poller partition. + */ + private int resultLimit = DEFAULT_RESULT_LIMIT; @Min(1) private int partitionCount = 1; @@ -127,13 +132,14 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu private transient ScheduledExecutorService scanService; private transient AtomicReference<Throwable> threadException; protected transient boolean isPolled; - protected transient Integer lastPolledRow; protected transient LinkedBlockingDeque<T> emitQueue; protected transient PreparedStatement ps; protected boolean isPollerPartition; protected transient MutablePair<Integer, Integer> currentWindowRecoveryState; + private transient int lastOffset; + public AbstractJdbcPollInputOperator() { currentWindowRecoveryState = new MutablePair<>(); @@ -175,8 +181,7 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu try { // If its a range query pass upper and lower bounds, If its a polling query pass only the lower bound if (isPollerPartition) { - ps = store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(), Integer.MAX_VALUE), - TYPE_FORWARD_ONLY, CONCUR_READ_ONLY); + lastOffset = rangeQueryPair.getKey(); } else { ps = store.getConnection().prepareStatement( buildRangeQuery(rangeQueryPair.getKey(), (rangeQueryPair.getValue() - rangeQueryPair.getKey())), @@ -203,30 +208,11 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu } } if (isPollerPartition) { - updatePollQuery(); isPolled = false; } lowerBound = lastEmittedRow; } - private void updatePollQuery() - { - if ((lastPolledRow != lastEmittedRow)) { - if (lastEmittedRow == null) { - lastPolledRow = rangeQueryPair.getKey(); - } else { - lastPolledRow = lastEmittedRow; - } - try { - ps = store.getConnection().prepareStatement(buildRangeQuery(lastPolledRow, Integer.MAX_VALUE), - TYPE_FORWARD_ONLY, CONCUR_READ_ONLY); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - } - @Override public void emitTuples() { @@ -269,20 +255,42 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu store.disconnect(); } + /** + * Function to insert results of a query in emit Queue + * @param preparedStatement PreparedStatement to execute the query and store the results in emit Queue. + */ + protected void insertDbDataInQueue(PreparedStatement preparedStatement) throws SQLException, InterruptedException + { + preparedStatement.setFetchSize(getFetchSize()); + ResultSet result = preparedStatement.executeQuery(); + if (result.next()) { + do { + while (!emitQueue.offer(getTuple(result))) { + Thread.sleep(DEFAULT_SLEEP_TIME); + } + } while (result.next()); + result.close(); + } + preparedStatement.close(); + } + protected void pollRecords() { if (isPolled) { return; } try { - ps.setFetchSize(getFetchSize()); - ResultSet result = ps.executeQuery(); - if (result.next()) { - do { - while (!emitQueue.offer(getTuple(result))) { - Thread.sleep(DEFAULT_SLEEP_TIME); - } - } while (result.next()); + if (isPollerPartition) { + int nextOffset = getRecordsCount(); + while (lastOffset < nextOffset) { + PreparedStatement preparedStatement = store.getConnection().prepareStatement(buildRangeQuery(lastOffset, resultLimit), + TYPE_FORWARD_ONLY, CONCUR_READ_ONLY); + insertDbDataInQueue(preparedStatement); + lastOffset = lastOffset + resultLimit; + } + lastOffset = nextOffset; + } else { + insertDbDataInQueue(ps); } isPolled = true; } catch (SQLException ex) { @@ -295,6 +303,7 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu store.disconnect(); } } + isPolled = true; } public abstract T getTuple(ResultSet result); @@ -334,6 +343,9 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu } ps = store.getConnection().prepareStatement(buildRangeQuery(bound, Integer.MAX_VALUE), TYPE_FORWARD_ONLY, CONCUR_READ_ONLY); + if (isPollerPartition) { + lastOffset = bound; + } } scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS); } catch (SQLException e) { @@ -717,6 +729,24 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu this.key = key; } + /** + * gets the Result Limit size, parameter to limit the number of results + * to fetch in one query by the Poller partition. + */ + public int getResultLimit() + { + return resultLimit; + } + + /** + * Sets the + * @param resultLimit Parameter to limit the number of results to fetch in one query by the Poller partition. + */ + public void setResultLimit(int resultLimit) + { + this.resultLimit = resultLimit; + } + private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcPollInputOperator.class); }
