APEXMALHAR-2172: Updates to JDBC Poll Input Operator
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/26fa9d78 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/26fa9d78 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/26fa9d78 Branch: refs/heads/master Commit: 26fa9d781e032fd61fbb2972a99874e77e0c509c Parents: 3316d6a Author: Priyanka Gugale <[email protected]> Authored: Tue Jul 19 12:10:08 2016 +0530 Committer: Priyanka Gugale <[email protected]> Committed: Thu Aug 11 15:56:42 2016 +0530 ---------------------------------------------------------------------- library/pom.xml | 5 + .../db/jdbc/AbstractJdbcPollInputOperator.java | 915 ++++++++++--------- .../lib/db/jdbc/JdbcMetaDataUtility.java | 353 ------- .../lib/db/jdbc/JdbcPOJOPollInputOperator.java | 325 +++++++ .../lib/db/jdbc/JdbcPollInputOperator.java | 186 +--- .../lib/db/jdbc/JdbcOperatorTest.java | 627 +------------ .../lib/db/jdbc/JdbcPojoOperatorTest.java | 606 ++++++++++++ .../db/jdbc/JdbcPojoPollableOpeartorTest.java | 241 +++++ .../datatorrent/lib/db/jdbc/JdbcPollerTest.java | 246 ----- 9 files changed, 1707 insertions(+), 1797 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/pom.xml ---------------------------------------------------------------------- diff --git a/library/pom.xml b/library/pom.xml index b8c6271..a6bdf64 100644 --- a/library/pom.xml +++ b/library/pom.xml @@ -341,6 +341,11 @@ <version>7.0.6</version> </dependency> <dependency> + <groupId>org.jooq</groupId> + <artifactId>jooq</artifactId> + <version>3.6.4</version> + </dependency> + <dependency> <groupId>org.apache.apex</groupId> <artifactId>apex-shaded-ning19</artifactId> <version>1.0.0</version> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java index ab12ed3..234e28c 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java @@ -27,11 +27,23 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.validation.constraints.Min; - +import javax.validation.constraints.NotNull; + +import org.jooq.Condition; +import org.jooq.DSLContext; +import org.jooq.Field; +import org.jooq.SelectField; +import org.jooq.conf.ParamType; +import org.jooq.impl.DSL; +import org.jooq.tools.jdbc.JDBCUtils; +import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.apex.malhar.lib.wal.FSWindowDataManager; @@ -39,242 +51,86 @@ import org.apache.apex.malhar.lib.wal.WindowDataManager; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.hadoop.classification.InterfaceStability.Evolving; +import com.google.common.annotations.VisibleForTesting; + import com.datatorrent.api.Context; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultPartition; import com.datatorrent.api.Operator.ActivationListener; -import com.datatorrent.api.Operator.IdleTimeHandler; import com.datatorrent.api.Partitioner; +import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.api.annotation.Stateless; import com.datatorrent.lib.db.AbstractStoreInputOperator; import com.datatorrent.lib.util.KeyValPair; import com.datatorrent.lib.util.KryoCloneUtils; import com.datatorrent.netlet.util.DTThrowable; +import static java.sql.ResultSet.CONCUR_READ_ONLY; +import static java.sql.ResultSet.TYPE_FORWARD_ONLY; +import static org.jooq.impl.DSL.field; + /** - * Abstract operator for for consuming data using JDBC interface<br> - * User needs User needs to provide - * tableName,dbConnection,setEmitColumnList,look-up key <br> - * Optionally batchSize,pollInterval,Look-up key and a where clause can be given - * <br> - * This operator uses static partitioning to arrive at range queries for exactly - * once reads<br> + * Abstract operator for consuming data using JDBC interface<br> + * User needs to provide tableName, dbConnection, columnsExpression, look-up key<br> + * Optionally batchSize, pollInterval and a where clause can be given <br> + * This operator uses static partitioning to arrive at range queries to + * idempotent reads<br> * This operator will create a configured number of non-polling static * partitions for fetching the existing data in the table. And an additional * single partition for polling additive data. Assumption is that there is an - * ordered column using which range queries can be formed<br> - * If an emitColumnList is provided, please ensure that the keyColumn is the - * first column in the list<br> - * Range queries are formed using the {@link JdbcMetaDataUtility}} Output - - * comma separated list of the emit columns eg columnA,columnB,columnC<br> - * Only newly added data which has increasing ids will be fetched by the polling - * jdbc partition + * ordered unique column using which range queries can be formed<br> * - * In the next iterations this operator would support an in-clause for - * idempotency instead of having only range query support to support non ordered - * key columns + * Only newly added data will be fetched by the polling jdbc partition, also + * assumption is rows won't be added or deleted in middle during scan. * * * @displayName Jdbc Polling Input Operator * @category Input - * @tags database, sql, jdbc, partitionable,exactlyOnce + * @tags database, sql, jdbc, partitionable, idepotent, pollable */ @Evolving -public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInputOperator<T, JdbcStore> - implements ActivationListener<Context>, IdleTimeHandler, Partitioner<AbstractJdbcPollInputOperator<T>> +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInputOperator<T, JdbcStore> implements + ActivationListener<OperatorContext>, Partitioner<AbstractJdbcPollInputOperator<T>> { - /** - * poll interval in milliseconds - */ - private static int pollInterval = 10000; + private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024; + private static int DEFAULT_POLL_INTERVAL = 10 * 1000; + private static int DEFAULT_FETCH_SIZE = 20000; + private static int DEFAULT_BATCH_SIZE = 2000; + private static int DEFAULT_SLEEP_TIME = 100; + private int pollInterval = DEFAULT_POLL_INTERVAL; //in miliseconds + private int queueCapacity = DEFAULT_QUEUE_CAPACITY; + private int fetchSize = DEFAULT_FETCH_SIZE; @Min(1) private int partitionCount = 1; - protected transient int operatorId; - protected transient boolean isReplayed; - protected transient boolean isPollable; - protected int batchSize; - protected static int fetchSize = 20000; - /** - * Map of windowId to <lower bound,upper bound> of the range key - */ - protected transient MutablePair<String, String> currentWindowRecoveryState; - - /** - * size of the emit queue used to hold polled records before emit - */ - private static int queueCapacity = 4 * 1024 * 1024; + private int batchSize = DEFAULT_BATCH_SIZE; + + @NotNull + private String tableName; + @NotNull + private String columnsExpression; + @NotNull + private String key; + private String whereCondition = null; + private long currentWindowId; + private WindowDataManager windowManager; + + protected KeyValPair<Integer, Integer> rangeQueryPair; + protected Integer lowerBound; + protected Integer lastEmittedRow; + private transient int operatorId; + private transient DSLContext create; private transient volatile boolean execute; - private transient AtomicReference<Throwable> cause; - protected transient int spinMillis; - private transient OperatorContext context; - protected String tableName; - protected String key; - protected long currentWindowId; - protected KeyValPair<String, String> rangeQueryPair; - protected String lower; - protected String upper; - protected boolean recovered; - protected boolean isPolled; - protected String whereCondition = null; - protected String previousUpperBound; - protected String highestPolled; - private static final String user = ""; - private static final String password = ""; - /** - * thread to poll database - */ - private transient Thread dbPoller; - protected transient ArrayBlockingQueue<List<T>> emitQueue; + private transient ScheduledExecutorService scanService; + private transient AtomicReference<Throwable> threadException; + protected transient boolean isPolled; + protected transient Integer lastPolledRow; + protected transient LinkedBlockingDeque<T> emitQueue; protected transient PreparedStatement ps; - protected WindowDataManager windowManager; - private String emitColumnList; - - /** - * Returns the where clause - */ - public String getWhereCondition() - { - return whereCondition; - } - - /** - * Sets the where clause - */ - public void setWhereCondition(String whereCondition) - { - this.whereCondition = whereCondition; - } - - /** - * Returns the list of columns to select from the query - */ - public String getEmitColumnList() - { - return emitColumnList; - } - - /** - * Comma separated list of columns to select from the given table - */ - public void setEmitColumnList(String emitColumnList) - { - this.emitColumnList = emitColumnList; - } - - /** - * Returns the fetchsize for getting the results - */ - public int getFetchSize() - { - return fetchSize; - } - - /** - * Sets the fetchsize for getting the results - */ - public void setFetchSize(int fetchSize) - { - this.fetchSize = fetchSize; - } - - protected abstract void pollRecords(PreparedStatement ps); - - /** - * Returns the interval for polling the queue - */ - public int getPollInterval() - { - return pollInterval; - } - - /** - * Sets the interval for polling the emit queue - */ - public void setPollInterval(int pollInterval) - { - this.pollInterval = pollInterval; - } - - /** - * Returns the capacity of the emit queue - */ - public int getQueueCapacity() - { - return queueCapacity; - } - - /** - * Sets the capacity of the emit queue - */ - public void setQueueCapacity(int queueCapacity) - { - this.queueCapacity = queueCapacity; - } - - /** - * Returns the ordered key used to generate the range queries - */ - public String getKey() - { - return key; - } - - /** - * Sets the ordered key used to generate the range queries - */ - public void setKey(String key) - { - this.key = key; - } + protected boolean isPollerPartition; - /** - * Returns the tableName which would be queried - */ - public String getTableName() - { - return tableName; - } - - /** - * Sets the tableName to query - */ - public void setTableName(String tableName) - { - this.tableName = tableName; - } - - /** - * Returns rangeQueryPair - <lowerBound,upperBound> - */ - public KeyValPair<String, String> getRangeQueryPair() - { - return rangeQueryPair; - } - - /** - * Sets the rangeQueryPair <lowerBound,upperBound> - */ - public void setRangeQueryPair(KeyValPair<String, String> rangeQueryPair) - { - this.rangeQueryPair = rangeQueryPair; - } - - /** - * Returns batchSize indicating the number of elements in emitQueue - */ - public int getBatchSize() - { - return batchSize; - } - - /** - * Sets batchSize for number of elements in the emitQueue - */ - public void setBatchSize(int batchSize) - { - this.batchSize = batchSize; - } + protected transient MutablePair<Integer, Integer> currentWindowRecoveryState; public AbstractJdbcPollInputOperator() { @@ -286,271 +142,234 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu public void setup(OperatorContext context) { super.setup(context); - spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS); + intializeDSLContext(); + if (scanService == null) { + scanService = Executors.newScheduledThreadPool(1); + } execute = true; - cause = new AtomicReference<Throwable>(); - emitQueue = new ArrayBlockingQueue<List<T>>(queueCapacity); - this.context = context; + emitQueue = new LinkedBlockingDeque<>(queueCapacity); operatorId = context.getId(); + windowManager.setup(context); + } - try { + private void intializeDSLContext() + { + create = DSL.using(store.getConnection(), JDBCUtils.dialect(store.getDatabaseUrl())); + } + + @Override + public void activate(OperatorContext context) + { + initializePreparedStatement(); + long largestRecoveryWindow = windowManager.getLargestRecoveryWindow(); + if (largestRecoveryWindow == Stateless.WINDOW_ID + || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow) { + scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS); + } + } - //If its a range query pass upper and lower bounds - //If its a polling query pass only the lower bound - if (getRangeQueryPair().getValue() != null) { - ps = store.getConnection() - .prepareStatement( - JdbcMetaDataUtility.buildRangeQuery(getTableName(), getKey(), rangeQueryPair.getKey(), - rangeQueryPair.getValue()), - java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); + protected void initializePreparedStatement() + { + try { + // If its a range query pass upper and lower bounds, If its a polling query pass only the lower bound + if (isPollerPartition) { + ps = store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(), Integer.MAX_VALUE), + TYPE_FORWARD_ONLY, CONCUR_READ_ONLY); } else { ps = store.getConnection().prepareStatement( - JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), rangeQueryPair.getKey()), - java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); - isPollable = true; + buildRangeQuery(rangeQueryPair.getKey(), (rangeQueryPair.getValue() - rangeQueryPair.getKey())), + TYPE_FORWARD_ONLY, CONCUR_READ_ONLY); } - } catch (SQLException e) { LOG.error("Exception in initializing the range query for a given partition", e); throw new RuntimeException(e); } - windowManager.setup(context); - LOG.debug("super setup done..."); } @Override public void beginWindow(long windowId) { currentWindowId = windowId; - - isReplayed = false; - if (currentWindowId <= windowManager.getLargestRecoveryWindow()) { try { replay(currentWindowId); + return; } catch (SQLException e) { LOG.error("Exception in replayed windows", e); throw new RuntimeException(e); } } - - if (isReplayed && currentWindowId == windowManager.getLargestRecoveryWindow()) { - try { - if (!isPollable && rangeQueryPair.getValue() != null) { - - ps = store.getConnection().prepareStatement( - JdbcMetaDataUtility.buildGTRangeQuery(getTableName(), getKey(), previousUpperBound, - rangeQueryPair.getValue()), - java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); - } else { - String bound = null; - if (previousUpperBound == null) { - bound = getRangeQueryPair().getKey(); - } else { - bound = previousUpperBound; - } - ps = store.getConnection().prepareStatement( - JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound), - java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); - isPollable = true; - } - isReplayed = false; - LOG.debug("Prepared statement after re-initialization - {} ", ps.toString()); - } catch (SQLException e) { - // TODO Auto-generated catch block - throw new RuntimeException(e); - } + if (isPollerPartition) { + updatePollQuery(); + isPolled = false; } + lowerBound = lastEmittedRow; + } - //Reset the pollable query with the updated upper and lower bounds - if (isPollable) { + private void updatePollQuery() + { + if ((lastPolledRow != lastEmittedRow)) { + if (lastEmittedRow == null) { + lastPolledRow = rangeQueryPair.getKey(); + } else { + lastPolledRow = lastEmittedRow; + } try { - String bound = null; - if (previousUpperBound == null && highestPolled == null) { - bound = getRangeQueryPair().getKey(); - } else { - bound = highestPolled; - } - ps = store.getConnection().prepareStatement( - JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound), - java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); - LOG.debug("Polling query {} {}", ps.toString(), currentWindowId); - isPolled = false; + ps = store.getConnection().prepareStatement(buildRangeQuery(lastPolledRow, Integer.MAX_VALUE), + TYPE_FORWARD_ONLY, CONCUR_READ_ONLY); } catch (SQLException e) { throw new RuntimeException(e); } } - lower = null; - upper = null; - - //Check if a thread is already active and start only if its no - //Do not start the thread from setup, will conflict with the replay - if (dbPoller == null && !isReplayed) { - //If this is not a replayed state, reset the ps to highest read offset + 1, - //keep the upper bound as the one that was initialized after static partitioning - LOG.info("Statement when re-initialized {}", ps.toString()); - dbPoller = new Thread(new DBPoller()); - dbPoller.start(); - } } @Override public void emitTuples() { - if (isReplayed) { + if (currentWindowId <= windowManager.getLargestRecoveryWindow()) { return; } - - List<T> tuples; - - if ((tuples = emitQueue.poll()) != null) { - for (Object tuple : tuples) { - if (lower == null) { - lower = tuple.toString(); - } - upper = tuple.toString(); - outputPort.emit((T)tuple); + int pollSize = (emitQueue.size() < batchSize) ? emitQueue.size() : batchSize; + while (pollSize-- > 0) { + T obj = emitQueue.poll(); + if (obj != null) { + emitTuple(obj); } + lastEmittedRow++; } } + protected abstract void emitTuple(T tuple); + @Override public void endWindow() { try { if (currentWindowId > windowManager.getLargestRecoveryWindow()) { - if (currentWindowRecoveryState == null) { - currentWindowRecoveryState = new MutablePair<String, String>(); - } - if (lower != null && upper != null) { - previousUpperBound = upper; - isPolled = true; - } - MutablePair<String, String> windowBoundaryPair = new MutablePair<>(lower, upper); - currentWindowRecoveryState = windowBoundaryPair; + currentWindowRecoveryState = new MutablePair<>(lowerBound, lastEmittedRow); windowManager.save(currentWindowRecoveryState, operatorId, currentWindowId); } } catch (IOException e) { throw new RuntimeException("saving recovery", e); } - currentWindowRecoveryState = new MutablePair<>(); - } - - public int getPartitionCount() - { - return partitionCount; + if (threadException != null) { + store.disconnect(); + DTThrowable.rethrow(threadException.get()); + } } - public void setPartitionCount(int partitionCount) + @Override + public void deactivate() { - this.partitionCount = partitionCount; + scanService.shutdownNow(); + store.disconnect(); } - @Override - public void activate(Context cntxt) + protected void pollRecords() { - if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID - && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) { - // If it is a replay state, don't start any threads here + if (isPolled) { return; } - } - - @Override - public void deactivate() - { try { - if (dbPoller != null && dbPoller.isAlive()) { - dbPoller.interrupt(); - dbPoller.join(); + ps.setFetchSize(getFetchSize()); + ResultSet result = ps.executeQuery(); + if (result.next()) { + do { + while (!emitQueue.offer(getTuple(result))) { + Thread.sleep(DEFAULT_SLEEP_TIME); + } + } while (result.next()); } - } catch (InterruptedException ex) { - // log and ignore, ending execution anyway - LOG.error("exception in poller thread: ", ex); - } - } - - @Override - public void handleIdleTime() - { - if (execute) { - try { - Thread.sleep(spinMillis); - } catch (InterruptedException ie) { - throw new RuntimeException(ie); + isPolled = true; + } catch (SQLException ex) { + execute = false; + threadException = new AtomicReference<Throwable>(ex); + } catch (InterruptedException e) { + threadException = new AtomicReference<Throwable>(e); + } finally { + if (!isPollerPartition) { + store.disconnect(); } - } else { - LOG.error("Exception: ", cause); - DTThrowable.rethrow(cause.get()); } } + public abstract T getTuple(ResultSet result); + protected void replay(long windowId) throws SQLException { - isReplayed = true; - MutablePair<String, String> recoveredData = new MutablePair<String, String>(); try { - recoveredData = (MutablePair<String, String>)windowManager.load(operatorId, windowId); + MutablePair<Integer, Integer> recoveredData = (MutablePair<Integer, Integer>)windowManager.load(operatorId, + windowId); - if (recoveredData != null) { - //skip the window and return if there was no incoming data in the window - if (recoveredData.left == null || recoveredData.right == null) { - return; - } + if (recoveredData != null && shouldReplayWindow(recoveredData)) { + LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", windowId, recoveredData.left, + recoveredData.right); - if (recoveredData.right.equals(rangeQueryPair.getValue()) || recoveredData.right.equals(previousUpperBound)) { - LOG.info("Matched so returning"); - return; - } - - JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator(); - jdbcPoller.setStore(store); - jdbcPoller.setKey(getKey()); - jdbcPoller.setPartitionCount(getPartitionCount()); - jdbcPoller.setPollInterval(getPollInterval()); - jdbcPoller.setTableName(getTableName()); - jdbcPoller.setBatchSize(getBatchSize()); - isPollable = false; - - LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + "," + recoveredData.right + "]"); + ps = store.getConnection().prepareStatement( + buildRangeQuery(recoveredData.left, (recoveredData.right - recoveredData.left)), TYPE_FORWARD_ONLY, + CONCUR_READ_ONLY); + LOG.info("Query formed to recover data - {}", ps.toString()); - jdbcPoller.setRangeQueryPair(new KeyValPair<String, String>(recoveredData.left, recoveredData.right)); + emitReplayedTuples(ps); - jdbcPoller.ps = jdbcPoller.store.getConnection().prepareStatement( - JdbcMetaDataUtility.buildRangeQuery(jdbcPoller.getTableName(), jdbcPoller.getKey(), - jdbcPoller.getRangeQueryPair().getKey(), jdbcPoller.getRangeQueryPair().getValue()), - java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); - LOG.info("Query formed for recovered data - {}", jdbcPoller.ps.toString()); + } - emitReplayedTuples(jdbcPoller.ps); + if (currentWindowId == windowManager.getLargestRecoveryWindow()) { + try { + if (!isPollerPartition && rangeQueryPair.getValue() != null) { + ps = store.getConnection().prepareStatement( + buildRangeQuery(lastEmittedRow, (rangeQueryPair.getValue() - lastEmittedRow)), TYPE_FORWARD_ONLY, + CONCUR_READ_ONLY); + } else { + Integer bound = null; + if (lastEmittedRow == null) { + bound = rangeQueryPair.getKey(); + } else { + bound = lastEmittedRow; + } + ps = store.getConnection().prepareStatement(buildRangeQuery(bound, Integer.MAX_VALUE), TYPE_FORWARD_ONLY, + CONCUR_READ_ONLY); + } + scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS); + } catch (SQLException e) { + throw new RuntimeException(e); + } } } catch (IOException e) { - throw new RuntimeException("replay", e); + throw new RuntimeException("Exception during replay of records.", e); } } + private boolean shouldReplayWindow(MutablePair<Integer, Integer> recoveredData) + { + if (recoveredData.left == null || recoveredData.right == null) { + return false; + } + if (recoveredData.right.equals(rangeQueryPair.getValue()) || recoveredData.right.equals(lastEmittedRow)) { + return false; + } + return true; + } + /** * Replays the tuples in sync mode for replayed windows */ public void emitReplayedTuples(PreparedStatement ps) { - LOG.debug("Emitting replayed statement is -" + ps.toString()); ResultSet rs = null; try (PreparedStatement pStat = ps;) { pStat.setFetchSize(getFetchSize()); - LOG.debug("sql query = {}", pStat); rs = pStat.executeQuery(); if (rs == null || rs.isClosed()) { - LOG.debug("Nothing to replay"); return; } while (rs.next()) { - previousUpperBound = rs.getObject(getKey()).toString(); - outputPort.emit((T)rs.getObject(getKey())); + emitTuple(getTuple(rs)); + lastEmittedRow++; } } catch (SQLException ex) { throw new RuntimeException(ex); @@ -565,59 +384,42 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu */ @Override public Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> definePartitions( - Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> partitions, - com.datatorrent.api.Partitioner.PartitioningContext context) + Collection<Partition<AbstractJdbcPollInputOperator<T>>> partitions, PartitioningContext context) { List<Partition<AbstractJdbcPollInputOperator<T>>> newPartitions = new ArrayList<Partition<AbstractJdbcPollInputOperator<T>>>( getPartitionCount()); - JdbcStore jdbcStore = new JdbcStore(); - jdbcStore.setDatabaseDriver(store.getDatabaseDriver()); - jdbcStore.setDatabaseUrl(store.getDatabaseUrl()); - jdbcStore.setConnectionProperties(store.getConnectionProperties()); - - jdbcStore.connect(); - HashMap<Integer, KeyValPair<String, String>> partitionToRangeMap = null; + HashMap<Integer, KeyValPair<Integer, Integer>> partitionToRangeMap = null; try { - partitionToRangeMap = JdbcMetaDataUtility.getPartitionedQueryMap(getPartitionCount(), - jdbcStore.getDatabaseDriver(), jdbcStore.getDatabaseUrl(), getTableName(), getKey(), - store.getConnectionProperties().getProperty(user), store.getConnectionProperties().getProperty(password), - whereCondition, emitColumnList); + store.connect(); + intializeDSLContext(); + partitionToRangeMap = getPartitionedQueryRangeMap(getPartitionCount()); } catch (SQLException e) { LOG.error("Exception in initializing the partition range", e); + throw new RuntimeException(e); + } finally { + store.disconnect(); } KryoCloneUtils<AbstractJdbcPollInputOperator<T>> cloneUtils = KryoCloneUtils.createCloneUtils(this); + // The n given partitions are for range queries and n + 1 partition is for polling query for (int i = 0; i <= getPartitionCount(); i++) { - AbstractJdbcPollInputOperator<T> jdbcPoller = null; - - jdbcPoller = cloneUtils.getClone(); - - jdbcPoller.setStore(store); - jdbcPoller.setKey(getKey()); - jdbcPoller.setPartitionCount(getPartitionCount()); - jdbcPoller.setPollInterval(getPollInterval()); - jdbcPoller.setTableName(getTableName()); - jdbcPoller.setBatchSize(getBatchSize()); - jdbcPoller.setEmitColumnList(getEmitColumnList()); - - store.connect(); - //The n given partitions are for range queries and n + 1 partition is for polling query - //The upper bound for the n+1 partition is set to null since its a pollable partition + AbstractJdbcPollInputOperator<T> jdbcPoller = cloneUtils.getClone(); if (i < getPartitionCount()) { - jdbcPoller.setRangeQueryPair(partitionToRangeMap.get(i)); - isPollable = false; + jdbcPoller.rangeQueryPair = partitionToRangeMap.get(i); + jdbcPoller.lastEmittedRow = partitionToRangeMap.get(i).getKey(); + jdbcPoller.isPollerPartition = false; } else { - jdbcPoller.setRangeQueryPair(new KeyValPair<String, String>(partitionToRangeMap.get(i - 1).getValue(), null)); - isPollable = true; + // The upper bound for the n+1 partition is set to null since its a pollable partition + int partitionKey = partitionToRangeMap.get(i - 1).getValue(); + jdbcPoller.rangeQueryPair = new KeyValPair<Integer, Integer>(partitionKey, null); + jdbcPoller.lastEmittedRow = partitionKey; + jdbcPoller.isPollerPartition = true; } - Partition<AbstractJdbcPollInputOperator<T>> po = new DefaultPartition<AbstractJdbcPollInputOperator<T>>( - jdbcPoller); - newPartitions.add(po); + newPartitions.add(new DefaultPartition<AbstractJdbcPollInputOperator<T>>(jdbcPoller)); } - previousUpperBound = null; return newPartitions; } @@ -625,7 +427,69 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu public void partitioned( Map<Integer, com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> partitions) { - //Nothing to implement here + // Nothing to implement here + } + + private HashMap<Integer, KeyValPair<Integer, Integer>> getPartitionedQueryRangeMap(int partitions) + throws SQLException + { + int rowCount = 0; + try { + rowCount = getRecordsCount(); + } catch (SQLException e) { + LOG.error("Exception in getting the record range", e); + } + + HashMap<Integer, KeyValPair<Integer, Integer>> partitionToQueryMap = new HashMap<>(); + int events = (rowCount / partitions); + for (int i = 0, lowerOffset = 0, upperOffset = events; i < partitions - 1; i++, lowerOffset += events, upperOffset += events) { + partitionToQueryMap.put(i, new KeyValPair<Integer, Integer>(lowerOffset, upperOffset)); + } + + partitionToQueryMap.put(partitions - 1, new KeyValPair<Integer, Integer>(events * (partitions - 1), (int)rowCount)); + LOG.info("Partition map - " + partitionToQueryMap.toString()); + return partitionToQueryMap; + } + + /** + * Finds the total number of rows in the table + * + * @return number of records in table + */ + private int getRecordsCount() throws SQLException + { + Condition condition = DSL.trueCondition(); + if (getWhereCondition() != null) { + condition = condition.and(getWhereCondition()); + } + int recordsCount = create.select(DSL.count()).from(getTableName()).where(condition).fetchOne(0, int.class); + return recordsCount; + } + + /** + * Helper function returns a range query based on the bounds passed<br> + */ + protected String buildRangeQuery(int offset, int limit) + { + Condition condition = DSL.trueCondition(); + if (getWhereCondition() != null) { + condition = condition.and(getWhereCondition()); + } + + String sqlQuery; + if (getColumnsExpression() != null) { + Collection<Field<?>> columns = new ArrayList<>(); + for (String column : getColumnsExpression().split(",")) { + columns.add(field(column)); + } + sqlQuery = create.select((Collection<? extends SelectField<?>>)columns).from(getTableName()).where(condition) + .orderBy(field(getKey())).limit(limit).offset(offset).getSQL(ParamType.INLINED); + } else { + sqlQuery = create.select().from(getTableName()).where(condition).orderBy(field(getKey())).limit(limit) + .offset(offset).getSQL(ParamType.INLINED); + } + LOG.info("DSL Query: " + sqlQuery); + return sqlQuery; } /** @@ -638,24 +502,219 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu public void run() { while (execute) { - try { - long startTs = System.currentTimeMillis(); - if ((isPollable && !isPolled) || !isPollable) { - pollRecords(ps); - } - long endTs = System.currentTimeMillis(); - long ioTime = endTs - startTs; - long sleepTime = pollInterval - ioTime; - LOG.debug("pollInterval = {} , I/O time = {} , sleepTime = {}", pollInterval, ioTime, sleepTime); - Thread.sleep(sleepTime > 0 ? sleepTime : 0); - } catch (Exception ex) { - cause.set(ex); - execute = false; + if ((isPollerPartition && !isPolled) || !isPollerPartition) { + pollRecords(); } } } } - private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(AbstractJdbcPollInputOperator.class); + @VisibleForTesting + protected void setScheduledExecutorService(ScheduledExecutorService service) + { + scanService = service; + } + + /** + * Gets {@link WindowDataManager} + * + * @return windowDatatManager + */ + public WindowDataManager getWindowManager() + { + return windowManager; + } + + /** + * Sets {@link WindowDataManager} + * + * @param windowDataManager + */ + public void setWindowManager(WindowDataManager windowDataManager) + { + this.windowManager = windowDataManager; + } + + /** + * Gets non-polling static partitions count + * + * @return partitionCount + */ + public int getPartitionCount() + { + return partitionCount; + } + + /** + * Sets non-polling static partitions count + * + * @param partitionCount + */ + public void setPartitionCount(int partitionCount) + { + this.partitionCount = partitionCount; + } + + /** + * Returns the where clause + * + * @return whereCondition + */ + public String getWhereCondition() + { + return whereCondition; + } + + /** + * Sets the where clause + * + * @param whereCondition + */ + public void setWhereCondition(String whereCondition) + { + this.whereCondition = whereCondition; + } + + /** + * Returns the list of columns to select from the table + * + * @return columnsExpression + */ + public String getColumnsExpression() + { + return columnsExpression; + } + + /** + * Comma separated list of columns to select from the given table + * + * @param columnsExpression + */ + public void setColumnsExpression(String columnsExpression) + { + this.columnsExpression = columnsExpression; + } + + /** + * Returns the fetchsize for getting the results + * + * @return fetchSize + */ + public int getFetchSize() + { + return fetchSize; + } + + /** + * Sets the fetchsize for getting the results + * + * @param fetchSize + */ + public void setFetchSize(int fetchSize) + { + this.fetchSize = fetchSize; + } + + /** + * Returns the interval for polling the DB + * + * @return pollInterval + */ + public int getPollInterval() + { + return pollInterval; + } + + /** + * Sets the interval for polling the DB + * + * @param pollInterval + */ + public void setPollInterval(int pollInterval) + { + this.pollInterval = pollInterval; + } + + /** + * Returns the capacity of the emit queue + * + * @return queueCapacity + */ + public int getQueueCapacity() + { + return queueCapacity; + } + + /** + * Sets the capacity of the emit queue + * + * @param queueCapacity + */ + public void setQueueCapacity(int queueCapacity) + { + this.queueCapacity = queueCapacity; + } + + /** + * Returns the tableName which would be queried + * + * @return tableName + */ + public String getTableName() + { + return tableName; + } + + /** + * Sets the tableName to query + * + * @param tableName + */ + public void setTableName(String tableName) + { + this.tableName = tableName; + } + + /** + * Returns batchSize indicating the number of elements to emit in a bacth + * + * @return batchSize + */ + public int getBatchSize() + { + return batchSize; + } + + /** + * Sets batchSize for number of elements to emit in a bacth + * + * @param batchSize + */ + public void setBatchSize(int batchSize) + { + this.batchSize = batchSize; + } + + /** + * Sets primary key column name + * + * @return key + */ + public String getKey() + { + return key; + } + + /** + * gets primary key column name + * + * @param key + */ + public void setKey(String key) + { + this.key = key; + } + + private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcPollInputOperator.class); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java deleted file mode 100644 index 9ba353c..0000000 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java +++ /dev/null @@ -1,353 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.db.jdbc; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.HashMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.classification.InterfaceStability.Evolving; - -import com.datatorrent.lib.util.KeyValPair; - -/** - * A utility class used to retrieve the metadata for a given unique key of a SQL - * table. This class would emit range queries based on a primary index given - * - * LIMIT clause may not work with all databases or may not return the same - * results always<br> - * - * This utility has been tested with MySQL and where clause is supported<br> - * - * @Input - dbName,tableName, primaryKey - * @Output - map<operatorId,prepared statement> - * - */ -@Evolving -public class JdbcMetaDataUtility -{ - private static String DB_DRIVER = "com.mysql.jdbc.Driver"; - private static String DB_CONNECTION = ""; - private static String DB_USER = ""; - private static String DB_PASSWORD = ""; - private static String TABLE_NAME = ""; - private static String KEY_COLUMN = ""; - private static String WHERE_CLAUSE = null; - private static String COLUMN_LIST = null; - - private static Logger LOG = LoggerFactory.getLogger(JdbcMetaDataUtility.class); - - public JdbcMetaDataUtility() - { - - } - - public JdbcMetaDataUtility(String dbConnection, String tableName, String key, String userName, String password) - { - DB_CONNECTION = dbConnection; - DB_USER = userName; - DB_PASSWORD = password; - TABLE_NAME = tableName; - KEY_COLUMN = key; - } - - /** - * Returns the database connection handle - * */ - private static Connection getDBConnection() - { - - Connection dbConnection = null; - - try { - Class.forName(DB_DRIVER); - } catch (ClassNotFoundException e) { - LOG.error("Driver not found", e); - throw new RuntimeException(e); - } - - try { - dbConnection = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD); - return dbConnection; - } catch (SQLException e) { - LOG.error("Exception in getting connection handle", e); - throw new RuntimeException(e); - } - - } - - private static String generateQueryString() - { - StringBuilder sb = new StringBuilder(); - sb.append("SELECT COUNT(*) as RowCount from " + TABLE_NAME); - - if (WHERE_CLAUSE != null) { - sb.append(" WHERE " + WHERE_CLAUSE); - } - - return sb.toString(); - } - - /** - * Finds the total number of rows in the table - */ - private static long getRecordRange(String query) throws SQLException - { - long rowCount = 0; - Connection dbConnection = null; - PreparedStatement preparedStatement = null; - - try { - dbConnection = getDBConnection(); - preparedStatement = dbConnection.prepareStatement(query); - - ResultSet rs = preparedStatement.executeQuery(); - - while (rs.next()) { - rowCount = Long.parseLong(rs.getString(1)); - LOG.debug("# Rows - " + rowCount); - } - - } catch (SQLException e) { - LOG.error("Exception in retreiving result set", e); - throw new RuntimeException(e); - } finally { - if (preparedStatement != null) { - preparedStatement.close(); - } - if (dbConnection != null) { - dbConnection.close(); - } - } - return rowCount; - } - - /** - * Returns a pair of <upper,lower> bounds for each partition of the - * {@link JdbcPollInputOperator}} - */ - private static KeyValPair<String, String> getQueryBounds(long lower, long upper) throws SQLException - { - Connection dbConnection = null; - PreparedStatement psLowerBound = null; - PreparedStatement psUpperBound = null; - - StringBuilder lowerBound = new StringBuilder(); - StringBuilder upperBound = new StringBuilder(); - - KeyValPair<String, String> boundedQUeryPair = null; - - try { - dbConnection = getDBConnection(); - - /* - * Run this loop only for n-1 partitions. - * By default the last partition will have fewer records, since we are rounding off - * */ - - lowerBound.append("SELECT " + KEY_COLUMN + " FROM " + TABLE_NAME); - upperBound.append("SELECT " + KEY_COLUMN + " FROM " + TABLE_NAME); - - if (WHERE_CLAUSE != null) { - lowerBound.append(" WHERE " + WHERE_CLAUSE); - upperBound.append(" WHERE " + WHERE_CLAUSE); - } - - lowerBound.append(" LIMIT " + (0 + lower) + ",1"); - upperBound.append(" LIMIT " + (upper - 1) + ",1"); - - psLowerBound = dbConnection.prepareStatement(lowerBound.toString()); - psUpperBound = dbConnection.prepareStatement(upperBound.toString()); - - ResultSet rsLower = psLowerBound.executeQuery(); - ResultSet rsUpper = psUpperBound.executeQuery(); - - String lowerVal = null; - String upperVal = null; - - while (rsLower.next()) { - lowerVal = rsLower.getString(KEY_COLUMN); - } - - while (rsUpper.next()) { - upperVal = rsUpper.getString(KEY_COLUMN); - } - - boundedQUeryPair = new KeyValPair<String, String>(lowerVal, upperVal); - - } catch (SQLException e) { - LOG.error("Exception in getting bounds for queries"); - throw new RuntimeException(e); - } finally { - if (psLowerBound != null) { - psLowerBound.close(); - } - if (psUpperBound != null) { - psUpperBound.close(); - } - if (dbConnection != null) { - dbConnection.close(); - } - } - return boundedQUeryPair; - - } - - /** - * Returns a map of partitionId to a KeyValPair of <lower,upper> of the query - * range<br> - * Ensures even distribution of records across partitions except the last - * partition By default the last partition for batch queries will have fewer - * records, since we are rounding off - * - * @throws SQLException - * - */ - private static HashMap<Integer, KeyValPair<String, String>> getRangeQueries(int numberOfPartitions, int events, - long rowCount) throws SQLException - { - HashMap<Integer, KeyValPair<String, String>> partitionToQueryMap = new HashMap<Integer, KeyValPair<String, String>>(); - for (int i = 0, lowerOffset = 0, upperOffset = events; i < numberOfPartitions - - 1; i++, lowerOffset += events, upperOffset += events) { - - partitionToQueryMap.put(i, getQueryBounds(lowerOffset, upperOffset)); - } - - //Call to construct the lower and upper bounds for the last partition - partitionToQueryMap.put(numberOfPartitions - 1, getQueryBounds(events * (numberOfPartitions - 1), rowCount)); - - LOG.info("Partition map - " + partitionToQueryMap.toString()); - - return partitionToQueryMap; - } - - /** - * Helper function returns a range query based on the bounds passed<br> - * Invoked from the setup method of {@link - AbstractJdbcPollInputOperator} to - * initialize the preparedStatement in the given operator<br> - * Optional whereClause for conditional selections Optional columnList for - * projection - */ - public static String buildRangeQuery(String tableName, String keyColumn, String lower, String upper) - { - - StringBuilder sb = new StringBuilder(); - sb.append("SELECT "); - if (COLUMN_LIST != null) { - sb.append(COLUMN_LIST); - } else { - sb.append("*"); - } - sb.append(" from " + tableName + " WHERE "); - if (WHERE_CLAUSE != null) { - sb.append(WHERE_CLAUSE + " AND "); - } - sb.append(keyColumn + " BETWEEN '" + lower + "' AND '" + upper + "'"); - return sb.toString(); - } - - /** - * Helper function that constructs a query from the next highest key after an - * operator is restarted - */ - public static String buildGTRangeQuery(String tableName, String keyColumn, String lower, String upper) - { - StringBuilder sb = new StringBuilder(); - sb.append("SELECT "); - if (COLUMN_LIST != null) { - sb.append(COLUMN_LIST); - } else { - sb.append("*"); - } - sb.append(" from " + tableName + " WHERE "); - if (WHERE_CLAUSE != null) { - sb.append(WHERE_CLAUSE + " AND "); - } - sb.append(keyColumn + " > '" + lower + "' AND " + keyColumn + " <= '" + upper + "'"); - return sb.toString(); - } - - /** - * Helper function that constructs a query for polling from outside the given - * range - */ - public static String buildPollableQuery(String tableName, String keyColumn, String lower) - { - StringBuilder sb = new StringBuilder(); - sb.append("SELECT "); - if (COLUMN_LIST != null) { - sb.append(COLUMN_LIST); - } else { - sb.append("*"); - } - sb.append(" from " + tableName + " WHERE "); - if (WHERE_CLAUSE != null) { - sb.append(WHERE_CLAUSE + " AND "); - } - sb.append(keyColumn + " > '" + lower + "' "); - return sb.toString(); - } - - /** - * Called by the partitioner from {@link - AbstractJdbcPollInputOperator}<br> - * Finds the range of query per partition<br> - * Returns a map of partitionId to PreparedStatement based on the range - * computed - * - * @throws SQLException - */ - public static HashMap<Integer, KeyValPair<String, String>> getPartitionedQueryMap(int partitions, String dbDriver, - String dbConnection, String tableName, String key, String userName, String password, String whereClause, - String columnList) throws SQLException - { - long rowCount = 0L; - try { - DB_CONNECTION = dbConnection; - DB_USER = userName; - DB_PASSWORD = password; - TABLE_NAME = tableName; - KEY_COLUMN = key; - WHERE_CLAUSE = whereClause; - COLUMN_LIST = columnList; - DB_DRIVER = dbDriver; - rowCount = getRecordRange(generateQueryString()); - } catch (SQLException e) { - LOG.error("Exception in getting the record range", e); - } - return getRangeQueries(partitions, getOffset(rowCount, partitions), rowCount); - } - - /** - * Returns the rounded offset to arrive at a range query - */ - private static int getOffset(long rowCount, int partitions) - { - if (rowCount % partitions == 0) { - return (int)(rowCount / partitions); - } else { - return (int)((rowCount - (rowCount % (partitions - 1))) / (partitions - 1)); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java new file mode 100644 index 0000000..91821be --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.List; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator.ActiveFieldInfo; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.PojoUtils; + +/** + * A concrete implementation for {@link AbstractJdbcPollInputOperator} to + * consume data from jdbc store and emit POJO for each record <br> + * + * @displayName Jdbc Polling Input Operator + * @category Input + * @tags database, sql, jdbc + */ +@Evolving +public class JdbcPOJOPollInputOperator extends AbstractJdbcPollInputOperator<Object> +{ + private final transient List<ActiveFieldInfo> columnFieldSetters = Lists.newArrayList(); + protected List<Integer> columnDataTypes; + protected transient Class<?> pojoClass; + @NotNull + private List<FieldInfo> fieldInfos; + + @OutputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>() + { + @Override + public void setup(Context.PortContext context) + { + pojoClass = context.getValue(Context.PortContext.TUPLE_CLASS); + } + }; + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + try { + // closing the query statement in super class as it is not needed + if (getColumnsExpression() == null) { + StringBuilder columns = new StringBuilder(); + for (int i = 0; i < fieldInfos.size(); i++) { + columns.append(fieldInfos.get(i).getColumnName()); + if (i < fieldInfos.size() - 1) { + columns.append(","); + } + } + setColumnsExpression(columns.toString()); + LOG.debug("select expr {}", columns.toString()); + } + + if (columnDataTypes == null) { + populateColumnDataTypes(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + + for (FieldInfo fi : fieldInfos) { + columnFieldSetters.add(new ActiveFieldInfo(fi)); + } + } + + @Override + public void activate(Context.OperatorContext context) + { + for (int i = 0; i < columnDataTypes.size(); i++) { + final int type = columnDataTypes.get(i); + JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo = columnFieldSetters.get(i); + switch (type) { + case (Types.CHAR): + case (Types.VARCHAR): + activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression(), String.class); + break; + + case (Types.BOOLEAN): + activeFieldInfo.setterOrGetter = PojoUtils.createSetterBoolean(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + case (Types.TINYINT): + activeFieldInfo.setterOrGetter = PojoUtils.createSetterByte(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + case (Types.SMALLINT): + activeFieldInfo.setterOrGetter = PojoUtils.createSetterShort(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + case (Types.INTEGER): + activeFieldInfo.setterOrGetter = PojoUtils.createSetterInt(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + case (Types.BIGINT): + activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + case (Types.FLOAT): + activeFieldInfo.setterOrGetter = PojoUtils.createSetterFloat(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + case (Types.DOUBLE): + activeFieldInfo.setterOrGetter = PojoUtils.createSetterDouble(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + case Types.DECIMAL: + activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression(), BigDecimal.class); + break; + + case Types.TIMESTAMP: + activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression(), Timestamp.class); + break; + + case Types.TIME: + activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression(), Time.class); + break; + + case Types.DATE: + activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression(), Date.class); + break; + + default: + throw new RuntimeException("unsupported data type " + type); + } + } + super.activate(context); + } + + protected void populateColumnDataTypes() throws SQLException + { + columnDataTypes = Lists.newArrayList(); + PreparedStatement stmt = store.getConnection().prepareStatement(buildRangeQuery(1, 1)); + try (ResultSet rs = stmt.executeQuery()) { + Map<String, Integer> nameToType = Maps.newHashMap(); + ResultSetMetaData rsMetaData = rs.getMetaData(); + LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount()); + + for (int i = 1; i <= rsMetaData.getColumnCount(); i++) { + int type = rsMetaData.getColumnType(i); + String name = rsMetaData.getColumnName(i); + LOG.debug("column name {} type {}", name, type); + nameToType.put(name, type); + } + + for (FieldInfo fieldInfo : fieldInfos) { + columnDataTypes.add(nameToType.get(fieldInfo.getColumnName())); + } + } + } + + @SuppressWarnings("unchecked") + @Override + public Object getTuple(ResultSet result) + { + Object obj; + try { + obj = pojoClass.newInstance(); + } catch (InstantiationException | IllegalAccessException ex) { + store.disconnect(); + throw new RuntimeException(ex); + } + + try { + for (int i = 0; i < fieldInfos.size(); i++) { + int type = columnDataTypes.get(i); + ActiveFieldInfo afi = columnFieldSetters.get(i); + switch (type) { + case Types.CHAR: + case Types.VARCHAR: + String strVal = result.getString(i + 1); + ((PojoUtils.Setter<Object, String>)afi.setterOrGetter).set(obj, strVal); + break; + + case Types.BOOLEAN: + boolean boolVal = result.getBoolean(i + 1); + ((PojoUtils.SetterBoolean<Object>)afi.setterOrGetter).set(obj, boolVal); + break; + + case Types.TINYINT: + byte byteVal = result.getByte(i + 1); + ((PojoUtils.SetterByte<Object>)afi.setterOrGetter).set(obj, byteVal); + break; + + case Types.SMALLINT: + short shortVal = result.getShort(i + 1); + ((PojoUtils.SetterShort<Object>)afi.setterOrGetter).set(obj, shortVal); + break; + + case Types.INTEGER: + int intVal = result.getInt(i + 1); + ((PojoUtils.SetterInt<Object>)afi.setterOrGetter).set(obj, intVal); + break; + + case Types.BIGINT: + long longVal = result.getLong(i + 1); + ((PojoUtils.SetterLong<Object>)afi.setterOrGetter).set(obj, longVal); + break; + + case Types.FLOAT: + float floatVal = result.getFloat(i + 1); + ((PojoUtils.SetterFloat<Object>)afi.setterOrGetter).set(obj, floatVal); + break; + + case Types.DOUBLE: + double doubleVal = result.getDouble(i + 1); + ((PojoUtils.SetterDouble<Object>)afi.setterOrGetter).set(obj, doubleVal); + break; + + case Types.DECIMAL: + BigDecimal bdVal = result.getBigDecimal(i + 1); + ((PojoUtils.Setter<Object, BigDecimal>)afi.setterOrGetter).set(obj, bdVal); + break; + + case Types.TIMESTAMP: + Timestamp tsVal = result.getTimestamp(i + 1); + ((PojoUtils.Setter<Object, Timestamp>)afi.setterOrGetter).set(obj, tsVal); + break; + + case Types.TIME: + Time timeVal = result.getTime(i + 1); + ((PojoUtils.Setter<Object, Time>)afi.setterOrGetter).set(obj, timeVal); + break; + + case Types.DATE: + Date dateVal = result.getDate(i + 1); + ((PojoUtils.Setter<Object, Date>)afi.setterOrGetter).set(obj, dateVal); + break; + + default: + throw new RuntimeException("unsupported data type " + type); + } + } + return obj; + } catch (SQLException e) { + store.disconnect(); + throw new RuntimeException("fetching metadata", e); + } + } + + @Override + protected void emitTuple(Object obj) + { + outputPort.emit(obj); + } + + /** + * A list of {@link FieldInfo}s where each item maps a column name to a pojo + * field name. + */ + public List<FieldInfo> getFieldInfos() + { + return fieldInfos; + } + + /** + * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a + * pojo field name.<br/> + * The value from fieldInfo.column is assigned to + * fieldInfo.pojoFieldExpression. + * + * @description $[].columnName name of the database column name + * @description $[].pojoFieldExpression pojo field name or expression + * @useSchema $[].pojoFieldExpression outputPort.fields[].name + */ + public void setFieldInfos(List<FieldInfo> fieldInfos) + { + this.fieldInfos = fieldInfos; + } + + private static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOPollInputOperator.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java index 45f96bf..518ac17 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java @@ -18,214 +18,66 @@ */ package com.datatorrent.lib.db.jdbc; -import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceStability.Evolving; import com.google.common.collect.Lists; import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; /** - * A concrete implementation for {@link AbstractJdbcPollInputOperator}} for - * consuming data from MySQL using JDBC interface <br> - * User needs to provide tableName,dbConnection,setEmitColumnList,look-up key - * <br> - * Optionally batchSize,pollInterval,Look-up key and a where clause can be given - * <br> - * This operator uses static partitioning to arrive at range queries for exactly - * once reads<br> - * Assumption is that there is an ordered column using which range queries can - * be formed<br> - * If an emitColumnList is provided, please ensure that the keyColumn is the - * first column in the list<br> - * Range queries are formed using the {@link JdbcMetaDataUtility}} Output - - * comma separated list of the emit columns eg columnA,columnB,columnC + * A concrete implementation for {@link AbstractJdbcPollInputOperator} to + * consume data from jdbc store and emit comma separated values <br> * * @displayName Jdbc Polling Input Operator * @category Input * @tags database, sql, jdbc */ @Evolving -@OperatorAnnotation(checkpointableWithinAppWindow = false) -public class JdbcPollInputOperator extends AbstractJdbcPollInputOperator<Object> +public class JdbcPollInputOperator extends AbstractJdbcPollInputOperator<String> { - private long lastBatchWindowId; - private transient long currentWindowId; - private long lastCreationTsMillis; - private long fetchBackMillis = 0L; + @OutputPortFieldAnnotation + public final transient DefaultOutputPort<String> outputPort = new DefaultOutputPort<>(); private ArrayList<String> emitColumns; - private transient int count = 0; - - /** - * Returns the emit columns - */ - public List<String> getEmitColumns() - { - return emitColumns; - } - - /** - * Sets the emit columns - */ - public void setEmitColumns(ArrayList<String> emitColumns) - { - this.emitColumns = emitColumns; - } - - /** - * Returns fetchBackMilis - */ - public long getFetchBackMillis() - { - return fetchBackMillis; - } - - /** - * Sets fetchBackMilis - used in polling - */ - public void setFetchBackMillis(long fetchBackMillis) - { - this.fetchBackMillis = fetchBackMillis; - } @Override public void setup(OperatorContext context) { super.setup(context); - parseEmitColumnList(getEmitColumnList()); - lastCreationTsMillis = System.currentTimeMillis() - fetchBackMillis; + parseEmitColumnList(); } - private void parseEmitColumnList(String columnList) + private void parseEmitColumnList() { - String[] cols = columnList.split(","); - ArrayList<String> arr = Lists.newArrayList(); + String[] cols = getColumnsExpression().split(","); + emitColumns = Lists.newArrayList(); for (int i = 0; i < cols.length; i++) { - arr.add(cols[i]); + emitColumns.add(cols[i]); } - setEmitColumns(arr); - } - - @Override - public void beginWindow(long l) - { - super.beginWindow(l); - currentWindowId = l; } @Override - protected void pollRecords(PreparedStatement ps) + public String getTuple(ResultSet rs) { - ResultSet rs = null; - List<Object> metaList = new ArrayList<>(); - - if (isReplayed) { - return; - } - + StringBuilder resultTuple = new StringBuilder(); try { - if (ps.isClosed()) { - LOG.debug("Returning due to closed ps for non-pollable partitions"); - return; + for (String obj : emitColumns) { + resultTuple.append(rs.getObject(obj) + ","); } + return resultTuple.substring(0, resultTuple.length() - 1); //remove last comma } catch (SQLException e) { - LOG.error("Prepared statement is closed", e); throw new RuntimeException(e); } - - try (PreparedStatement pStat = ps;) { - pStat.setFetchSize(getFetchSize()); - LOG.debug("sql query = {}", pStat); - rs = pStat.executeQuery(); - boolean hasNext = false; - - if (rs == null || rs.isClosed()) { - return; - } - - while ((hasNext = rs.next())) { - Object key = null; - StringBuilder resultTuple = new StringBuilder(); - try { - if (count < getBatchSize()) { - key = rs.getObject(getKey()); - for (String obj : emitColumns) { - resultTuple.append(rs.getObject(obj) + ","); - } - metaList.add(resultTuple.substring(0, resultTuple.length() - 1)); - count++; - } else { - emitQueue.add(metaList); - metaList = new ArrayList<>(); - key = rs.getObject(getKey()); - for (String obj : emitColumns) { - resultTuple.append(rs.getObject(obj) + ","); - } - metaList.add(resultTuple.substring(0, resultTuple.length() - 1)); - count = 0; - } - } catch (NullPointerException npe) { - LOG.error("Key not found" + npe); - throw new RuntimeException(npe); - } - if (isPollable) { - highestPolled = key.toString(); - isPolled = true; - } - } - /*Flush the remaining records once the result set is over and batch-size is not reached, - * Dont flush if its pollable*/ - if (!hasNext) { - if ((isPollable && isPolled) || !isPollable) { - emitQueue.offer(metaList); - metaList = new ArrayList<>(); - count = 0; - } - if (!isPolled) { - isPolled = true; - } - } - LOG.debug("last creation time stamp = {}", lastCreationTsMillis); - } catch (SQLException ex) { - throw new RuntimeException(ex); - } } @Override - public void emitTuples() + protected void emitTuple(String tuple) { - if (isReplayed) { - LOG.debug( - "Calling emit tuples during window - " + currentWindowId + "::" + windowManager.getLargestRecoveryWindow()); - LOG.debug("Returning for replayed window"); - return; - } - - List<Object> tuples; - - if (lastBatchWindowId < currentWindowId) { - if ((tuples = emitQueue.poll()) != null) { - for (Object tuple : tuples) { - String[] str = tuple.toString().split(","); - if (lower == null) { - lower = str[0]; - } - upper = str[0]; - outputPort.emit(tuple); - } - lastBatchWindowId = currentWindowId; - } - } + outputPort.emit(tuple); } - - private static final Logger LOG = LoggerFactory.getLogger(JdbcPollInputOperator.class); }
