[ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15302988#comment-15302988 ]
ASF GitHub Bot commented on APEXMALHAR-2066: -------------------------------------------- Github user gauravgopi123 commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/282#discussion_r64825921 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -0,0 +1,652 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import javax.validation.constraints.Min; + +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.commons.lang3.tuple.MutablePair; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.Operator.IdleTimeHandler; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.lib.db.AbstractStoreInputOperator; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Abstract operator for for consuming data using JDBC interface<br> + * User needs User needs to provide + * tableName,dbConnection,setEmitColumnList,look-up key <br> + * Optionally batchSize,pollInterval,Look-up key and a where clause can be given + * <br> + * This operator uses static partitioning to arrive at range queries for exactly + * once reads<br> + * Assumption is that there is an ordered column using which range queries can + * be formed<br> + * If an emitColumnList is provided, please ensure that the keyColumn is the + * first column in the list<br> + * Range queries are formed using the {@link JdbcMetaDataUtility}} Output - + * comma separated list of the emit columns eg columnA,columnB,columnC + * + * @displayName Jdbc Polling Input Operator + * @category Input + * @tags database, sql, jdbc, partitionable,exactlyOnce + */ +public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInputOperator<T, JdbcStore> + implements ActivationListener<Context>, IdleTimeHandler, Partitioner<AbstractJdbcPollInputOperator<T>> +{ + /* + * poll interval in milliseconds + */ + private int pollInterval; + + @Min(1) + private int partitionCount = 1; + protected transient int operatorId; + protected transient boolean isReplayed; + protected transient boolean isPollable; + protected int batchSize; + protected int fetchSize; + /** + * Map of windowId to <lower bound,upper bound> of the range key + */ + protected transient MutablePair<String, String> currentWindowRecoveryState; + + /** + * size of the emit queue used to hold polled records before emit + */ + private int queueCapacity = 4 * 1024 * 1024; + private transient volatile boolean execute; + private transient AtomicReference<Throwable> cause; + protected transient int spinMillis; + private transient OperatorContext context; + protected String tableName; + protected String key; + protected long currentWindowId; + protected KeyValPair<String, String> rangeQueryPair; + protected String lower; + protected String upper; + protected boolean recovered; + protected boolean isPolled; + protected String whereCondition = null; + protected String previousUpperBound; + protected String highestPolled; + private static final String user = "user"; + private static final String password = "password"; + /** + * thread to poll database + */ + private transient Thread dbPoller; + protected transient ArrayBlockingQueue<List<T>> emitQueue; + protected transient PreparedStatement ps; + protected WindowDataManager windowManager; + private String emitColumnList; + + /** + * Returns the where clause + */ + public String getWhereCondition() + { + return whereCondition; + } + + /** + * Sets the where clause + */ + public void setWhereCondition(String whereCondition) + { + this.whereCondition = whereCondition; + } + + /** + * Returns the list of columns to select from the query + */ + public String getEmitColumnList() + { + return emitColumnList; + } + + /** + * Comma separated list of columns to select from the given table + */ + public void setEmitColumnList(String emitColumnList) + { + this.emitColumnList = emitColumnList; + } + + /** + * Returns the fetchsize for getting the results + */ + public int getFetchSize() + { + return fetchSize; + } + + /** + * Sets the fetchsize for getting the results + */ + public void setFetchSize(int fetchSize) + { + this.fetchSize = fetchSize; + } + + protected abstract void pollRecords(PreparedStatement ps); + + /** + * Returns the interval for polling the queue + */ + public int getPollInterval() + { + return pollInterval; + } + + /** + * Sets the interval for polling the emit queue + */ + public void setPollInterval(int pollInterval) + { + this.pollInterval = pollInterval; + } + + /** + * Returns the capacity of the emit queue + */ + public int getQueueCapacity() + { + return queueCapacity; + } + + /** + * Sets the capacity of the emit queue + */ + public void setQueueCapacity(int queueCapacity) + { + this.queueCapacity = queueCapacity; + } + + /** + * Returns the ordered key used to generate the range queries + */ + public String getKey() + { + return key; + } + + /** + * Sets the ordered key used to generate the range queries + */ + public void setKey(String key) + { + this.key = key; + } + + /** + * Returns the tableName which would be queried + */ + public String getTableName() + { + return tableName; + } + + /** + * Sets the tableName to query + */ + public void setTableName(String tableName) + { + this.tableName = tableName; + } + + /** + * Returns rangeQueryPair - <lowerBound,upperBound> + */ + public KeyValPair<String, String> getRangeQueryPair() + { + return rangeQueryPair; + } + + /** + * Sets the rangeQueryPair <lowerBound,upperBound> + */ + public void setRangeQueryPair(KeyValPair<String, String> rangeQueryPair) + { + this.rangeQueryPair = rangeQueryPair; + } + + /** + * Returns batchSize indicating the number of elements in emitQueue + */ + public int getBatchSize() + { + return batchSize; + } + + /** + * Sets batchSize for number of elements in the emitQueue + */ + public void setBatchSize(int batchSize) + { + this.batchSize = batchSize; + } + + public AbstractJdbcPollInputOperator() + { + this.pollInterval = 10 * 1000; + this.fetchSize = 10000; + currentWindowRecoveryState = new MutablePair<>(); + windowManager = new FSWindowDataManager(); + } + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS); + execute = true; + cause = new AtomicReference<Throwable>(); + emitQueue = new ArrayBlockingQueue<List<T>>(queueCapacity); + this.context = context; + operatorId = context.getId(); + + try { + + //If its a range query pass upper and lower bounds + //If its a polling query pass only the lower bound + if (getRangeQueryPair().getValue() != null) { + ps = store.getConnection() + .prepareStatement( + JdbcMetaDataUtility.buildRangeQuery(getTableName(), getKey(), rangeQueryPair.getKey(), + rangeQueryPair.getValue()), + java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); + } else { + ps = store.getConnection().prepareStatement( + JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), rangeQueryPair.getKey()), + java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); + isPollable = true; + } + + } catch (SQLException e) { + LOG.error("Exception in initializing the range query for a given partition", e); + throw new RuntimeException(); + } + + windowManager.setup(context); + LOG.debug("super setup done..."); + } + + @Override + public void beginWindow(long windowId) + { + currentWindowId = windowId; + + isReplayed = false; + + if (currentWindowId <= windowManager.getLargestRecoveryWindow()) { + try { + replay(currentWindowId); + } catch (SQLException e) { + LOG.error("Exception in replayed windows", e); + throw new RuntimeException(e); + } + } + + if (isReplayed && currentWindowId == windowManager.getLargestRecoveryWindow()) { + try { + if (!isPollable && rangeQueryPair.getValue() != null) { + + ps = store.getConnection().prepareStatement( + JdbcMetaDataUtility.buildGTRangeQuery(getTableName(), getKey(), previousUpperBound, + rangeQueryPair.getValue()), + java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); + } else { + String bound = null; + if (previousUpperBound == null) { + bound = getRangeQueryPair().getKey(); + } else { + bound = previousUpperBound; + } + ps = store.getConnection().prepareStatement( + JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound), + java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); + isPollable = true; + } + isReplayed = false; + LOG.debug("Prepared statement after re-initialization - " + ps.toString()); + } catch (SQLException e) { + // TODO Auto-generated catch block --- End diff -- remove this > Add jdbc poller input operator > ------------------------------ > > Key: APEXMALHAR-2066 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066 > Project: Apache Apex Malhar > Issue Type: Task > Reporter: Ashwin Chandra Putta > Assignee: devendra tagare > > Create a JDBC poller input operator that has the following features. > 1. poll from external jdbc store asynchronously in the input operator. > 2. polling frequency and batch size should be configurable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)