Repository: apex-malhar Updated Branches: refs/heads/master 132012823 -> aaa4464f0
APEXMALHAR-2066 JdbcPolling,idempotent,partitionable Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/4c7d268d Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/4c7d268d Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/4c7d268d Branch: refs/heads/master Commit: 4c7d268dbcdb6adbe84daadac6787c4d2f6b203d Parents: 9b62506 Author: devtagare <[email protected]> Authored: Wed May 18 15:25:56 2016 -0700 Committer: devtagare <[email protected]> Committed: Thu Jul 14 11:42:36 2016 -0700 ---------------------------------------------------------------------- .../db/jdbc/AbstractJdbcPollInputOperator.java | 661 +++++++++++++++++++ .../lib/db/jdbc/JdbcMetaDataUtility.java | 353 ++++++++++ .../lib/db/jdbc/JdbcPollInputOperator.java | 231 +++++++ .../datatorrent/lib/db/jdbc/JdbcPollerTest.java | 246 +++++++ 4 files changed, 1491 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4c7d268d/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java new file mode 100644 index 0000000..ab12ed3 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java @@ -0,0 +1,661 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import javax.validation.constraints.Min; + +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.Operator.IdleTimeHandler; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.lib.db.AbstractStoreInputOperator; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Abstract operator for for consuming data using JDBC interface<br> + * User needs User needs to provide + * tableName,dbConnection,setEmitColumnList,look-up key <br> + * Optionally batchSize,pollInterval,Look-up key and a where clause can be given + * <br> + * This operator uses static partitioning to arrive at range queries for exactly + * once reads<br> + * This operator will create a configured number of non-polling static + * partitions for fetching the existing data in the table. And an additional + * single partition for polling additive data. Assumption is that there is an + * ordered column using which range queries can be formed<br> + * If an emitColumnList is provided, please ensure that the keyColumn is the + * first column in the list<br> + * Range queries are formed using the {@link JdbcMetaDataUtility}} Output - + * comma separated list of the emit columns eg columnA,columnB,columnC<br> + * Only newly added data which has increasing ids will be fetched by the polling + * jdbc partition + * + * In the next iterations this operator would support an in-clause for + * idempotency instead of having only range query support to support non ordered + * key columns + * + * + * @displayName Jdbc Polling Input Operator + * @category Input + * @tags database, sql, jdbc, partitionable,exactlyOnce + */ +@Evolving +public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInputOperator<T, JdbcStore> + implements ActivationListener<Context>, IdleTimeHandler, Partitioner<AbstractJdbcPollInputOperator<T>> +{ + /** + * poll interval in milliseconds + */ + private static int pollInterval = 10000; + + @Min(1) + private int partitionCount = 1; + protected transient int operatorId; + protected transient boolean isReplayed; + protected transient boolean isPollable; + protected int batchSize; + protected static int fetchSize = 20000; + /** + * Map of windowId to <lower bound,upper bound> of the range key + */ + protected transient MutablePair<String, String> currentWindowRecoveryState; + + /** + * size of the emit queue used to hold polled records before emit + */ + private static int queueCapacity = 4 * 1024 * 1024; + private transient volatile boolean execute; + private transient AtomicReference<Throwable> cause; + protected transient int spinMillis; + private transient OperatorContext context; + protected String tableName; + protected String key; + protected long currentWindowId; + protected KeyValPair<String, String> rangeQueryPair; + protected String lower; + protected String upper; + protected boolean recovered; + protected boolean isPolled; + protected String whereCondition = null; + protected String previousUpperBound; + protected String highestPolled; + private static final String user = ""; + private static final String password = ""; + /** + * thread to poll database + */ + private transient Thread dbPoller; + protected transient ArrayBlockingQueue<List<T>> emitQueue; + protected transient PreparedStatement ps; + protected WindowDataManager windowManager; + private String emitColumnList; + + /** + * Returns the where clause + */ + public String getWhereCondition() + { + return whereCondition; + } + + /** + * Sets the where clause + */ + public void setWhereCondition(String whereCondition) + { + this.whereCondition = whereCondition; + } + + /** + * Returns the list of columns to select from the query + */ + public String getEmitColumnList() + { + return emitColumnList; + } + + /** + * Comma separated list of columns to select from the given table + */ + public void setEmitColumnList(String emitColumnList) + { + this.emitColumnList = emitColumnList; + } + + /** + * Returns the fetchsize for getting the results + */ + public int getFetchSize() + { + return fetchSize; + } + + /** + * Sets the fetchsize for getting the results + */ + public void setFetchSize(int fetchSize) + { + this.fetchSize = fetchSize; + } + + protected abstract void pollRecords(PreparedStatement ps); + + /** + * Returns the interval for polling the queue + */ + public int getPollInterval() + { + return pollInterval; + } + + /** + * Sets the interval for polling the emit queue + */ + public void setPollInterval(int pollInterval) + { + this.pollInterval = pollInterval; + } + + /** + * Returns the capacity of the emit queue + */ + public int getQueueCapacity() + { + return queueCapacity; + } + + /** + * Sets the capacity of the emit queue + */ + public void setQueueCapacity(int queueCapacity) + { + this.queueCapacity = queueCapacity; + } + + /** + * Returns the ordered key used to generate the range queries + */ + public String getKey() + { + return key; + } + + /** + * Sets the ordered key used to generate the range queries + */ + public void setKey(String key) + { + this.key = key; + } + + /** + * Returns the tableName which would be queried + */ + public String getTableName() + { + return tableName; + } + + /** + * Sets the tableName to query + */ + public void setTableName(String tableName) + { + this.tableName = tableName; + } + + /** + * Returns rangeQueryPair - <lowerBound,upperBound> + */ + public KeyValPair<String, String> getRangeQueryPair() + { + return rangeQueryPair; + } + + /** + * Sets the rangeQueryPair <lowerBound,upperBound> + */ + public void setRangeQueryPair(KeyValPair<String, String> rangeQueryPair) + { + this.rangeQueryPair = rangeQueryPair; + } + + /** + * Returns batchSize indicating the number of elements in emitQueue + */ + public int getBatchSize() + { + return batchSize; + } + + /** + * Sets batchSize for number of elements in the emitQueue + */ + public void setBatchSize(int batchSize) + { + this.batchSize = batchSize; + } + + public AbstractJdbcPollInputOperator() + { + currentWindowRecoveryState = new MutablePair<>(); + windowManager = new FSWindowDataManager(); + } + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS); + execute = true; + cause = new AtomicReference<Throwable>(); + emitQueue = new ArrayBlockingQueue<List<T>>(queueCapacity); + this.context = context; + operatorId = context.getId(); + + try { + + //If its a range query pass upper and lower bounds + //If its a polling query pass only the lower bound + if (getRangeQueryPair().getValue() != null) { + ps = store.getConnection() + .prepareStatement( + JdbcMetaDataUtility.buildRangeQuery(getTableName(), getKey(), rangeQueryPair.getKey(), + rangeQueryPair.getValue()), + java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); + } else { + ps = store.getConnection().prepareStatement( + JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), rangeQueryPair.getKey()), + java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); + isPollable = true; + } + + } catch (SQLException e) { + LOG.error("Exception in initializing the range query for a given partition", e); + throw new RuntimeException(e); + } + + windowManager.setup(context); + LOG.debug("super setup done..."); + } + + @Override + public void beginWindow(long windowId) + { + currentWindowId = windowId; + + isReplayed = false; + + if (currentWindowId <= windowManager.getLargestRecoveryWindow()) { + try { + replay(currentWindowId); + } catch (SQLException e) { + LOG.error("Exception in replayed windows", e); + throw new RuntimeException(e); + } + } + + if (isReplayed && currentWindowId == windowManager.getLargestRecoveryWindow()) { + try { + if (!isPollable && rangeQueryPair.getValue() != null) { + + ps = store.getConnection().prepareStatement( + JdbcMetaDataUtility.buildGTRangeQuery(getTableName(), getKey(), previousUpperBound, + rangeQueryPair.getValue()), + java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); + } else { + String bound = null; + if (previousUpperBound == null) { + bound = getRangeQueryPair().getKey(); + } else { + bound = previousUpperBound; + } + ps = store.getConnection().prepareStatement( + JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound), + java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); + isPollable = true; + } + isReplayed = false; + LOG.debug("Prepared statement after re-initialization - {} ", ps.toString()); + } catch (SQLException e) { + // TODO Auto-generated catch block + throw new RuntimeException(e); + } + } + + //Reset the pollable query with the updated upper and lower bounds + if (isPollable) { + try { + String bound = null; + if (previousUpperBound == null && highestPolled == null) { + bound = getRangeQueryPair().getKey(); + } else { + bound = highestPolled; + } + ps = store.getConnection().prepareStatement( + JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound), + java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); + LOG.debug("Polling query {} {}", ps.toString(), currentWindowId); + isPolled = false; + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + lower = null; + upper = null; + + //Check if a thread is already active and start only if its no + //Do not start the thread from setup, will conflict with the replay + if (dbPoller == null && !isReplayed) { + //If this is not a replayed state, reset the ps to highest read offset + 1, + //keep the upper bound as the one that was initialized after static partitioning + LOG.info("Statement when re-initialized {}", ps.toString()); + dbPoller = new Thread(new DBPoller()); + dbPoller.start(); + } + } + + @Override + public void emitTuples() + { + if (isReplayed) { + return; + } + + List<T> tuples; + + if ((tuples = emitQueue.poll()) != null) { + for (Object tuple : tuples) { + if (lower == null) { + lower = tuple.toString(); + } + upper = tuple.toString(); + outputPort.emit((T)tuple); + } + } + } + + @Override + public void endWindow() + { + try { + if (currentWindowId > windowManager.getLargestRecoveryWindow()) { + if (currentWindowRecoveryState == null) { + currentWindowRecoveryState = new MutablePair<String, String>(); + } + if (lower != null && upper != null) { + previousUpperBound = upper; + isPolled = true; + } + MutablePair<String, String> windowBoundaryPair = new MutablePair<>(lower, upper); + currentWindowRecoveryState = windowBoundaryPair; + windowManager.save(currentWindowRecoveryState, operatorId, currentWindowId); + } + } catch (IOException e) { + throw new RuntimeException("saving recovery", e); + } + currentWindowRecoveryState = new MutablePair<>(); + } + + public int getPartitionCount() + { + return partitionCount; + } + + public void setPartitionCount(int partitionCount) + { + this.partitionCount = partitionCount; + } + + @Override + public void activate(Context cntxt) + { + if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID + && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) { + // If it is a replay state, don't start any threads here + return; + } + } + + @Override + public void deactivate() + { + try { + if (dbPoller != null && dbPoller.isAlive()) { + dbPoller.interrupt(); + dbPoller.join(); + } + } catch (InterruptedException ex) { + // log and ignore, ending execution anyway + LOG.error("exception in poller thread: ", ex); + } + } + + @Override + public void handleIdleTime() + { + if (execute) { + try { + Thread.sleep(spinMillis); + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } + } else { + LOG.error("Exception: ", cause); + DTThrowable.rethrow(cause.get()); + } + } + + protected void replay(long windowId) throws SQLException + { + isReplayed = true; + + MutablePair<String, String> recoveredData = new MutablePair<String, String>(); + try { + recoveredData = (MutablePair<String, String>)windowManager.load(operatorId, windowId); + + if (recoveredData != null) { + //skip the window and return if there was no incoming data in the window + if (recoveredData.left == null || recoveredData.right == null) { + return; + } + + if (recoveredData.right.equals(rangeQueryPair.getValue()) || recoveredData.right.equals(previousUpperBound)) { + LOG.info("Matched so returning"); + return; + } + + JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator(); + jdbcPoller.setStore(store); + jdbcPoller.setKey(getKey()); + jdbcPoller.setPartitionCount(getPartitionCount()); + jdbcPoller.setPollInterval(getPollInterval()); + jdbcPoller.setTableName(getTableName()); + jdbcPoller.setBatchSize(getBatchSize()); + isPollable = false; + + LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + "," + recoveredData.right + "]"); + + jdbcPoller.setRangeQueryPair(new KeyValPair<String, String>(recoveredData.left, recoveredData.right)); + + jdbcPoller.ps = jdbcPoller.store.getConnection().prepareStatement( + JdbcMetaDataUtility.buildRangeQuery(jdbcPoller.getTableName(), jdbcPoller.getKey(), + jdbcPoller.getRangeQueryPair().getKey(), jdbcPoller.getRangeQueryPair().getValue()), + java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); + LOG.info("Query formed for recovered data - {}", jdbcPoller.ps.toString()); + + emitReplayedTuples(jdbcPoller.ps); + } + } catch (IOException e) { + throw new RuntimeException("replay", e); + } + + } + + /** + * Replays the tuples in sync mode for replayed windows + */ + public void emitReplayedTuples(PreparedStatement ps) + { + LOG.debug("Emitting replayed statement is -" + ps.toString()); + ResultSet rs = null; + try (PreparedStatement pStat = ps;) { + pStat.setFetchSize(getFetchSize()); + LOG.debug("sql query = {}", pStat); + rs = pStat.executeQuery(); + if (rs == null || rs.isClosed()) { + LOG.debug("Nothing to replay"); + return; + } + while (rs.next()) { + previousUpperBound = rs.getObject(getKey()).toString(); + outputPort.emit((T)rs.getObject(getKey())); + } + } catch (SQLException ex) { + throw new RuntimeException(ex); + } + } + + /** + * Uses a static partitioning scheme to initialize operator partitions with + * non-overlapping key ranges to read In addition to 'n' partitions, 'n+1' + * partition is a polling partition which reads the records beyond the given + * range + */ + @Override + public Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> definePartitions( + Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> partitions, + com.datatorrent.api.Partitioner.PartitioningContext context) + { + List<Partition<AbstractJdbcPollInputOperator<T>>> newPartitions = new ArrayList<Partition<AbstractJdbcPollInputOperator<T>>>( + getPartitionCount()); + JdbcStore jdbcStore = new JdbcStore(); + jdbcStore.setDatabaseDriver(store.getDatabaseDriver()); + jdbcStore.setDatabaseUrl(store.getDatabaseUrl()); + jdbcStore.setConnectionProperties(store.getConnectionProperties()); + + jdbcStore.connect(); + + HashMap<Integer, KeyValPair<String, String>> partitionToRangeMap = null; + try { + partitionToRangeMap = JdbcMetaDataUtility.getPartitionedQueryMap(getPartitionCount(), + jdbcStore.getDatabaseDriver(), jdbcStore.getDatabaseUrl(), getTableName(), getKey(), + store.getConnectionProperties().getProperty(user), store.getConnectionProperties().getProperty(password), + whereCondition, emitColumnList); + } catch (SQLException e) { + LOG.error("Exception in initializing the partition range", e); + } + + KryoCloneUtils<AbstractJdbcPollInputOperator<T>> cloneUtils = KryoCloneUtils.createCloneUtils(this); + + for (int i = 0; i <= getPartitionCount(); i++) { + AbstractJdbcPollInputOperator<T> jdbcPoller = null; + + jdbcPoller = cloneUtils.getClone(); + + jdbcPoller.setStore(store); + jdbcPoller.setKey(getKey()); + jdbcPoller.setPartitionCount(getPartitionCount()); + jdbcPoller.setPollInterval(getPollInterval()); + jdbcPoller.setTableName(getTableName()); + jdbcPoller.setBatchSize(getBatchSize()); + jdbcPoller.setEmitColumnList(getEmitColumnList()); + + store.connect(); + //The n given partitions are for range queries and n + 1 partition is for polling query + //The upper bound for the n+1 partition is set to null since its a pollable partition + if (i < getPartitionCount()) { + jdbcPoller.setRangeQueryPair(partitionToRangeMap.get(i)); + isPollable = false; + } else { + jdbcPoller.setRangeQueryPair(new KeyValPair<String, String>(partitionToRangeMap.get(i - 1).getValue(), null)); + isPollable = true; + } + Partition<AbstractJdbcPollInputOperator<T>> po = new DefaultPartition<AbstractJdbcPollInputOperator<T>>( + jdbcPoller); + newPartitions.add(po); + } + + previousUpperBound = null; + return newPartitions; + } + + @Override + public void partitioned( + Map<Integer, com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> partitions) + { + //Nothing to implement here + } + + /** + * This class polls a store that can be queried with a JDBC interface The + * preparedStatement is updated as more rows are read + */ + public class DBPoller implements Runnable + { + @Override + public void run() + { + while (execute) { + try { + long startTs = System.currentTimeMillis(); + if ((isPollable && !isPolled) || !isPollable) { + pollRecords(ps); + } + long endTs = System.currentTimeMillis(); + long ioTime = endTs - startTs; + long sleepTime = pollInterval - ioTime; + LOG.debug("pollInterval = {} , I/O time = {} , sleepTime = {}", pollInterval, ioTime, sleepTime); + Thread.sleep(sleepTime > 0 ? sleepTime : 0); + } catch (Exception ex) { + cause.set(ex); + execute = false; + } + } + } + } + + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(AbstractJdbcPollInputOperator.class); + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4c7d268d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java new file mode 100644 index 0000000..9ba353c --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java @@ -0,0 +1,353 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.datatorrent.lib.util.KeyValPair; + +/** + * A utility class used to retrieve the metadata for a given unique key of a SQL + * table. This class would emit range queries based on a primary index given + * + * LIMIT clause may not work with all databases or may not return the same + * results always<br> + * + * This utility has been tested with MySQL and where clause is supported<br> + * + * @Input - dbName,tableName, primaryKey + * @Output - map<operatorId,prepared statement> + * + */ +@Evolving +public class JdbcMetaDataUtility +{ + private static String DB_DRIVER = "com.mysql.jdbc.Driver"; + private static String DB_CONNECTION = ""; + private static String DB_USER = ""; + private static String DB_PASSWORD = ""; + private static String TABLE_NAME = ""; + private static String KEY_COLUMN = ""; + private static String WHERE_CLAUSE = null; + private static String COLUMN_LIST = null; + + private static Logger LOG = LoggerFactory.getLogger(JdbcMetaDataUtility.class); + + public JdbcMetaDataUtility() + { + + } + + public JdbcMetaDataUtility(String dbConnection, String tableName, String key, String userName, String password) + { + DB_CONNECTION = dbConnection; + DB_USER = userName; + DB_PASSWORD = password; + TABLE_NAME = tableName; + KEY_COLUMN = key; + } + + /** + * Returns the database connection handle + * */ + private static Connection getDBConnection() + { + + Connection dbConnection = null; + + try { + Class.forName(DB_DRIVER); + } catch (ClassNotFoundException e) { + LOG.error("Driver not found", e); + throw new RuntimeException(e); + } + + try { + dbConnection = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD); + return dbConnection; + } catch (SQLException e) { + LOG.error("Exception in getting connection handle", e); + throw new RuntimeException(e); + } + + } + + private static String generateQueryString() + { + StringBuilder sb = new StringBuilder(); + sb.append("SELECT COUNT(*) as RowCount from " + TABLE_NAME); + + if (WHERE_CLAUSE != null) { + sb.append(" WHERE " + WHERE_CLAUSE); + } + + return sb.toString(); + } + + /** + * Finds the total number of rows in the table + */ + private static long getRecordRange(String query) throws SQLException + { + long rowCount = 0; + Connection dbConnection = null; + PreparedStatement preparedStatement = null; + + try { + dbConnection = getDBConnection(); + preparedStatement = dbConnection.prepareStatement(query); + + ResultSet rs = preparedStatement.executeQuery(); + + while (rs.next()) { + rowCount = Long.parseLong(rs.getString(1)); + LOG.debug("# Rows - " + rowCount); + } + + } catch (SQLException e) { + LOG.error("Exception in retreiving result set", e); + throw new RuntimeException(e); + } finally { + if (preparedStatement != null) { + preparedStatement.close(); + } + if (dbConnection != null) { + dbConnection.close(); + } + } + return rowCount; + } + + /** + * Returns a pair of <upper,lower> bounds for each partition of the + * {@link JdbcPollInputOperator}} + */ + private static KeyValPair<String, String> getQueryBounds(long lower, long upper) throws SQLException + { + Connection dbConnection = null; + PreparedStatement psLowerBound = null; + PreparedStatement psUpperBound = null; + + StringBuilder lowerBound = new StringBuilder(); + StringBuilder upperBound = new StringBuilder(); + + KeyValPair<String, String> boundedQUeryPair = null; + + try { + dbConnection = getDBConnection(); + + /* + * Run this loop only for n-1 partitions. + * By default the last partition will have fewer records, since we are rounding off + * */ + + lowerBound.append("SELECT " + KEY_COLUMN + " FROM " + TABLE_NAME); + upperBound.append("SELECT " + KEY_COLUMN + " FROM " + TABLE_NAME); + + if (WHERE_CLAUSE != null) { + lowerBound.append(" WHERE " + WHERE_CLAUSE); + upperBound.append(" WHERE " + WHERE_CLAUSE); + } + + lowerBound.append(" LIMIT " + (0 + lower) + ",1"); + upperBound.append(" LIMIT " + (upper - 1) + ",1"); + + psLowerBound = dbConnection.prepareStatement(lowerBound.toString()); + psUpperBound = dbConnection.prepareStatement(upperBound.toString()); + + ResultSet rsLower = psLowerBound.executeQuery(); + ResultSet rsUpper = psUpperBound.executeQuery(); + + String lowerVal = null; + String upperVal = null; + + while (rsLower.next()) { + lowerVal = rsLower.getString(KEY_COLUMN); + } + + while (rsUpper.next()) { + upperVal = rsUpper.getString(KEY_COLUMN); + } + + boundedQUeryPair = new KeyValPair<String, String>(lowerVal, upperVal); + + } catch (SQLException e) { + LOG.error("Exception in getting bounds for queries"); + throw new RuntimeException(e); + } finally { + if (psLowerBound != null) { + psLowerBound.close(); + } + if (psUpperBound != null) { + psUpperBound.close(); + } + if (dbConnection != null) { + dbConnection.close(); + } + } + return boundedQUeryPair; + + } + + /** + * Returns a map of partitionId to a KeyValPair of <lower,upper> of the query + * range<br> + * Ensures even distribution of records across partitions except the last + * partition By default the last partition for batch queries will have fewer + * records, since we are rounding off + * + * @throws SQLException + * + */ + private static HashMap<Integer, KeyValPair<String, String>> getRangeQueries(int numberOfPartitions, int events, + long rowCount) throws SQLException + { + HashMap<Integer, KeyValPair<String, String>> partitionToQueryMap = new HashMap<Integer, KeyValPair<String, String>>(); + for (int i = 0, lowerOffset = 0, upperOffset = events; i < numberOfPartitions + - 1; i++, lowerOffset += events, upperOffset += events) { + + partitionToQueryMap.put(i, getQueryBounds(lowerOffset, upperOffset)); + } + + //Call to construct the lower and upper bounds for the last partition + partitionToQueryMap.put(numberOfPartitions - 1, getQueryBounds(events * (numberOfPartitions - 1), rowCount)); + + LOG.info("Partition map - " + partitionToQueryMap.toString()); + + return partitionToQueryMap; + } + + /** + * Helper function returns a range query based on the bounds passed<br> + * Invoked from the setup method of {@link - AbstractJdbcPollInputOperator} to + * initialize the preparedStatement in the given operator<br> + * Optional whereClause for conditional selections Optional columnList for + * projection + */ + public static String buildRangeQuery(String tableName, String keyColumn, String lower, String upper) + { + + StringBuilder sb = new StringBuilder(); + sb.append("SELECT "); + if (COLUMN_LIST != null) { + sb.append(COLUMN_LIST); + } else { + sb.append("*"); + } + sb.append(" from " + tableName + " WHERE "); + if (WHERE_CLAUSE != null) { + sb.append(WHERE_CLAUSE + " AND "); + } + sb.append(keyColumn + " BETWEEN '" + lower + "' AND '" + upper + "'"); + return sb.toString(); + } + + /** + * Helper function that constructs a query from the next highest key after an + * operator is restarted + */ + public static String buildGTRangeQuery(String tableName, String keyColumn, String lower, String upper) + { + StringBuilder sb = new StringBuilder(); + sb.append("SELECT "); + if (COLUMN_LIST != null) { + sb.append(COLUMN_LIST); + } else { + sb.append("*"); + } + sb.append(" from " + tableName + " WHERE "); + if (WHERE_CLAUSE != null) { + sb.append(WHERE_CLAUSE + " AND "); + } + sb.append(keyColumn + " > '" + lower + "' AND " + keyColumn + " <= '" + upper + "'"); + return sb.toString(); + } + + /** + * Helper function that constructs a query for polling from outside the given + * range + */ + public static String buildPollableQuery(String tableName, String keyColumn, String lower) + { + StringBuilder sb = new StringBuilder(); + sb.append("SELECT "); + if (COLUMN_LIST != null) { + sb.append(COLUMN_LIST); + } else { + sb.append("*"); + } + sb.append(" from " + tableName + " WHERE "); + if (WHERE_CLAUSE != null) { + sb.append(WHERE_CLAUSE + " AND "); + } + sb.append(keyColumn + " > '" + lower + "' "); + return sb.toString(); + } + + /** + * Called by the partitioner from {@link - AbstractJdbcPollInputOperator}<br> + * Finds the range of query per partition<br> + * Returns a map of partitionId to PreparedStatement based on the range + * computed + * + * @throws SQLException + */ + public static HashMap<Integer, KeyValPair<String, String>> getPartitionedQueryMap(int partitions, String dbDriver, + String dbConnection, String tableName, String key, String userName, String password, String whereClause, + String columnList) throws SQLException + { + long rowCount = 0L; + try { + DB_CONNECTION = dbConnection; + DB_USER = userName; + DB_PASSWORD = password; + TABLE_NAME = tableName; + KEY_COLUMN = key; + WHERE_CLAUSE = whereClause; + COLUMN_LIST = columnList; + DB_DRIVER = dbDriver; + rowCount = getRecordRange(generateQueryString()); + } catch (SQLException e) { + LOG.error("Exception in getting the record range", e); + } + return getRangeQueries(partitions, getOffset(rowCount, partitions), rowCount); + } + + /** + * Returns the rounded offset to arrive at a range query + */ + private static int getOffset(long rowCount, int partitions) + { + if (rowCount % partitions == 0) { + return (int)(rowCount / partitions); + } else { + return (int)((rowCount - (rowCount % (partitions - 1))) / (partitions - 1)); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4c7d268d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java new file mode 100644 index 0000000..45f96bf --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.annotation.OperatorAnnotation; + +/** + * A concrete implementation for {@link AbstractJdbcPollInputOperator}} for + * consuming data from MySQL using JDBC interface <br> + * User needs to provide tableName,dbConnection,setEmitColumnList,look-up key + * <br> + * Optionally batchSize,pollInterval,Look-up key and a where clause can be given + * <br> + * This operator uses static partitioning to arrive at range queries for exactly + * once reads<br> + * Assumption is that there is an ordered column using which range queries can + * be formed<br> + * If an emitColumnList is provided, please ensure that the keyColumn is the + * first column in the list<br> + * Range queries are formed using the {@link JdbcMetaDataUtility}} Output - + * comma separated list of the emit columns eg columnA,columnB,columnC + * + * @displayName Jdbc Polling Input Operator + * @category Input + * @tags database, sql, jdbc + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public class JdbcPollInputOperator extends AbstractJdbcPollInputOperator<Object> +{ + private long lastBatchWindowId; + private transient long currentWindowId; + private long lastCreationTsMillis; + private long fetchBackMillis = 0L; + private ArrayList<String> emitColumns; + private transient int count = 0; + + /** + * Returns the emit columns + */ + public List<String> getEmitColumns() + { + return emitColumns; + } + + /** + * Sets the emit columns + */ + public void setEmitColumns(ArrayList<String> emitColumns) + { + this.emitColumns = emitColumns; + } + + /** + * Returns fetchBackMilis + */ + public long getFetchBackMillis() + { + return fetchBackMillis; + } + + /** + * Sets fetchBackMilis - used in polling + */ + public void setFetchBackMillis(long fetchBackMillis) + { + this.fetchBackMillis = fetchBackMillis; + } + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + parseEmitColumnList(getEmitColumnList()); + lastCreationTsMillis = System.currentTimeMillis() - fetchBackMillis; + } + + private void parseEmitColumnList(String columnList) + { + String[] cols = columnList.split(","); + ArrayList<String> arr = Lists.newArrayList(); + for (int i = 0; i < cols.length; i++) { + arr.add(cols[i]); + } + setEmitColumns(arr); + } + + @Override + public void beginWindow(long l) + { + super.beginWindow(l); + currentWindowId = l; + } + + @Override + protected void pollRecords(PreparedStatement ps) + { + ResultSet rs = null; + List<Object> metaList = new ArrayList<>(); + + if (isReplayed) { + return; + } + + try { + if (ps.isClosed()) { + LOG.debug("Returning due to closed ps for non-pollable partitions"); + return; + } + } catch (SQLException e) { + LOG.error("Prepared statement is closed", e); + throw new RuntimeException(e); + } + + try (PreparedStatement pStat = ps;) { + pStat.setFetchSize(getFetchSize()); + LOG.debug("sql query = {}", pStat); + rs = pStat.executeQuery(); + boolean hasNext = false; + + if (rs == null || rs.isClosed()) { + return; + } + + while ((hasNext = rs.next())) { + Object key = null; + StringBuilder resultTuple = new StringBuilder(); + try { + if (count < getBatchSize()) { + key = rs.getObject(getKey()); + for (String obj : emitColumns) { + resultTuple.append(rs.getObject(obj) + ","); + } + metaList.add(resultTuple.substring(0, resultTuple.length() - 1)); + count++; + } else { + emitQueue.add(metaList); + metaList = new ArrayList<>(); + key = rs.getObject(getKey()); + for (String obj : emitColumns) { + resultTuple.append(rs.getObject(obj) + ","); + } + metaList.add(resultTuple.substring(0, resultTuple.length() - 1)); + count = 0; + } + } catch (NullPointerException npe) { + LOG.error("Key not found" + npe); + throw new RuntimeException(npe); + } + if (isPollable) { + highestPolled = key.toString(); + isPolled = true; + } + } + /*Flush the remaining records once the result set is over and batch-size is not reached, + * Dont flush if its pollable*/ + if (!hasNext) { + if ((isPollable && isPolled) || !isPollable) { + emitQueue.offer(metaList); + metaList = new ArrayList<>(); + count = 0; + } + if (!isPolled) { + isPolled = true; + } + } + LOG.debug("last creation time stamp = {}", lastCreationTsMillis); + } catch (SQLException ex) { + throw new RuntimeException(ex); + } + } + + @Override + public void emitTuples() + { + if (isReplayed) { + LOG.debug( + "Calling emit tuples during window - " + currentWindowId + "::" + windowManager.getLargestRecoveryWindow()); + LOG.debug("Returning for replayed window"); + return; + } + + List<Object> tuples; + + if (lastBatchWindowId < currentWindowId) { + if ((tuples = emitQueue.poll()) != null) { + for (Object tuple : tuples) { + String[] str = tuple.toString().split(","); + if (lower == null) { + lower = str[0]; + } + upper = str[0]; + outputPort.emit(tuple); + } + lastBatchWindowId = currentWindowId; + } + } + } + + private static final Logger LOG = LoggerFactory.getLogger(JdbcPollInputOperator.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4c7d268d/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java new file mode 100644 index 0000000..573e45d --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.commons.io.FileUtils; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Partitioner; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +/** + * Tests for {@link AbstractJdbcPollInputOperator} and + * {@link JdbcPollInputOperator} + */ +public class JdbcPollerTest +{ + public static final String DB_DRIVER = "org.hsqldb.jdbcDriver"; + public static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true"; + + private static final String TABLE_NAME = "test_account_table"; + private static String APP_ID = "JdbcPollingOperatorTest"; + public String dir = null; + + @BeforeClass + public static void setup() + { + try { + cleanup(); + } catch (Exception e) { + throw new RuntimeException(e); + } + try { + Class.forName(DB_DRIVER).newInstance(); + + Connection con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + + String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + + " (Account_No INTEGER, Name VARCHAR(255), Amount INTEGER)"; + stmt.executeUpdate(createTable); + + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @AfterClass + public static void cleanup() + { + try { + FileUtils.deleteDirectory(new File("target/" + APP_ID)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static void cleanTable() + { + try { + Connection con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + String cleanTable = "delete from " + TABLE_NAME; + stmt.executeUpdate(cleanTable); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public static void insertEventsInTable(int numEvents, int offset) + { + try { + Connection con = DriverManager.getConnection(URL); + String insert = "insert into " + TABLE_NAME + " values (?,?,?)"; + PreparedStatement stmt = con.prepareStatement(insert); + for (int i = 0; i < numEvents; i++, offset++) { + stmt.setInt(1, offset); + stmt.setString(2, "Account_Holder-" + offset); + stmt.setInt(3, (offset * 1000)); + stmt.executeUpdate(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + /** + * Simulates actual application flow Adds a batch query partitiom, a pollable + * partition Incremental record polling is also checked + */ + @Test + public void testJdbcPollingInputOperatorBatch() throws InterruptedException + { + cleanTable(); + insertEventsInTable(10, 0); + JdbcStore store = new JdbcStore(); + store.setDatabaseDriver(DB_DRIVER); + store.setDatabaseUrl(URL); + + Attribute.AttributeMap.DefaultAttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); + this.dir = "target/" + APP_ID + "/"; + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + attributeMap.put(Context.DAGContext.APPLICATION_PATH, dir); + + JdbcPollInputOperator inputOperator = new JdbcPollInputOperator(); + inputOperator.setStore(store); + inputOperator.setBatchSize(100); + inputOperator.setPollInterval(1000); + inputOperator.setEmitColumnList("Account_No,Name,Amount"); + inputOperator.setKey("Account_No"); + inputOperator.setTableName(TABLE_NAME); + inputOperator.setFetchSize(100); + inputOperator.setPartitionCount(1); + + CollectorTestSink<Object> sink = new CollectorTestSink<>(); + inputOperator.outputPort.setSink(sink); + + TestUtils.MockBatchedOperatorStats readerStats = new TestUtils.MockBatchedOperatorStats(2); + + DefaultPartition<AbstractJdbcPollInputOperator<Object>> apartition = new DefaultPartition<AbstractJdbcPollInputOperator<Object>>( + inputOperator); + + TestUtils.MockPartition<AbstractJdbcPollInputOperator<Object>> pseudoParttion = new TestUtils.MockPartition<>( + apartition, readerStats); + + List<Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>> newMocks = Lists.newArrayList(); + + newMocks.add(pseudoParttion); + + Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>> newPartitions = inputOperator + .definePartitions(newMocks, null); + + Iterator<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>> itr = newPartitions + .iterator(); + + int operatorId = 0; + for (com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>> partition : newPartitions) { + + Attribute.AttributeMap.DefaultAttributeMap partitionAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); + this.dir = "target/" + APP_ID + "/"; + partitionAttributeMap.put(DAG.APPLICATION_ID, APP_ID); + partitionAttributeMap.put(Context.DAGContext.APPLICATION_PATH, dir); + + OperatorContextTestHelper.TestIdOperatorContext partitioningContext = new OperatorContextTestHelper.TestIdOperatorContext( + operatorId++, partitionAttributeMap); + + partition.getPartitionedInstance().setup(partitioningContext); + partition.getPartitionedInstance().activate(partitioningContext); + } + + //First partition is for range queries,last is for polling queries + AbstractJdbcPollInputOperator<Object> newInstance = itr.next().getPartitionedInstance(); + CollectorTestSink<Object> sink1 = new CollectorTestSink<>(); + newInstance.outputPort.setSink(sink1); + newInstance.beginWindow(1); + Thread.sleep(50); + newInstance.emitTuples(); + newInstance.endWindow(); + + Assert.assertEquals("rows from db", 10, sink1.collectedTuples.size()); + int i = 0; + for (Object tuple : sink1.collectedTuples) { + String[] pojoEvent = tuple.toString().split(","); + Assert.assertTrue("i=" + i, Integer.parseInt(pojoEvent[0]) == i ? true : false); + i++; + } + sink1.collectedTuples.clear(); + + insertEventsInTable(10, 10); + + AbstractJdbcPollInputOperator<Object> pollableInstance = itr.next().getPartitionedInstance(); + + pollableInstance.outputPort.setSink(sink1); + + pollableInstance.beginWindow(1); + Thread.sleep(pollableInstance.getPollInterval()); + pollableInstance.emitTuples(); + pollableInstance.endWindow(); + + + Assert.assertEquals("rows from db", 10, sink1.collectedTuples.size()); + i = 10; + for (Object tuple : sink1.collectedTuples) { + String[] pojoEvent = tuple.toString().split(","); + Assert.assertTrue("i=" + i, Integer.parseInt(pojoEvent[0]) == i ? true : false); + i++; + } + + sink1.collectedTuples.clear(); + insertEventsInTable(10, 20); + + pollableInstance.beginWindow(2); + Thread.sleep(pollableInstance.getPollInterval()); + pollableInstance.emitTuples(); + pollableInstance.endWindow(); + + Assert.assertEquals("rows from db", 10, sink1.collectedTuples.size()); + + i = 20; + for (Object tuple : sink1.collectedTuples) { + String[] pojoEvent = tuple.toString().split(","); + Assert.assertTrue("i=" + i, Integer.parseInt(pojoEvent[0]) == i ? true : false); + i++; + } + sink1.collectedTuples.clear(); + } + +}
