[ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15321693#comment-15321693 ]
ASF GitHub Bot commented on APEXMALHAR-2066: -------------------------------------------- Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/282#discussion_r66365124 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java --- @@ -0,0 +1,344 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.util.KeyValPair; + +/** + * A utility class used to retrieve the metadata for a given unique key of a SQL + * table. This class would emit range queries based on a primary index given + * + * @Input - dbName,tableName, primaryKey + * @Output - map<operatorId,prepared statement> + * + */ +public class JdbcMetaDataUtility +{ + private static String DB_DRIVER = "com.mysql.jdbc.Driver"; + private static String DB_CONNECTION = ""; + private static String DB_USER = ""; + private static String DB_PASSWORD = ""; + private static String TABLE_NAME = ""; + private static String KEY_COLUMN = ""; + private static String WHERE_CLAUSE = null; + private static String COLUMN_LIST = null; + + private static Logger LOG = LoggerFactory.getLogger(JdbcMetaDataUtility.class); + + public JdbcMetaDataUtility() + { + + } + + public JdbcMetaDataUtility(String dbConnection, String tableName, String key, String userName, String password) + { + DB_CONNECTION = dbConnection; + DB_USER = userName; + DB_PASSWORD = password; + TABLE_NAME = tableName; + KEY_COLUMN = key; + } + + private static Connection getDBConnection() + { + + Connection dbConnection = null; + + try { + Class.forName(DB_DRIVER); + } catch (ClassNotFoundException e) { + LOG.error("Driver not found", e); + } + + try { + dbConnection = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD); + return dbConnection; + } catch (SQLException e) { + LOG.error("Exception in getting connection handle", e); + } + + return dbConnection; + + } + + private static String generateQueryString() + { + StringBuilder sb = new StringBuilder(); + sb.append("SELECT COUNT(*) as RowCount from " + TABLE_NAME); + + if (WHERE_CLAUSE != null) { + sb.append(" WHERE " + WHERE_CLAUSE); + } + + return sb.toString(); + } + + /** + * Finds the total number of rows in the table + */ + private static long getRecordRange(String query) throws SQLException + { + long rowCount = 0; + Connection dbConnection = null; + PreparedStatement preparedStatement = null; + + try { + dbConnection = getDBConnection(); + preparedStatement = dbConnection.prepareStatement(query); + + ResultSet rs = preparedStatement.executeQuery(); + + while (rs.next()) { + rowCount = Long.parseLong(rs.getString("RowCount")); + LOG.info("# Rows - " + rowCount); + } + + } catch (SQLException e) { + LOG.error("Exception in retreiving result set", e); + } finally { + if (preparedStatement != null) { + preparedStatement.close(); + } + if (dbConnection != null) { + dbConnection.close(); + } + } + return rowCount; + } + + /** + * Returns a pair of <upper,lower> bounds for each partition of the + * {@link JdbcPollInputOperator}} + */ + private static KeyValPair<String, String> getQueryBounds(long lower, long upper) throws SQLException + { + Connection dbConnectionLower = null; + Connection dbConnectionUpper = null; + PreparedStatement psLowerBound = null; + PreparedStatement psUpperBound = null; + + StringBuilder lowerBound = new StringBuilder(); + StringBuilder upperBound = new StringBuilder(); + + KeyValPair<String, String> boundedQUeryPair = null; + + try { + dbConnectionLower = getDBConnection(); + dbConnectionUpper = getDBConnection(); --- End diff -- Do we need two database connections? > Add jdbc poller input operator > ------------------------------ > > Key: APEXMALHAR-2066 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066 > Project: Apache Apex Malhar > Issue Type: Task > Reporter: Ashwin Chandra Putta > Assignee: devendra tagare > > Create a JDBC poller input operator that has the following features. > 1. poll from external jdbc store asynchronously in the input operator. > 2. polling frequency and batch size should be configurable. > 3. should be idempotent. > 4. should be partition-able. > 5. should be batch + polling capable. > Assumptions for idempotency & partitioning, > 1.User needs to provide tableName,dbConnection,setEmitColumnList,look-up key. > 2.Optionally batchSize,pollInterval,Look-up key and a where clause can be > given. > 3.This operator uses static partitioning to arrive at range queries for > exactly once reads > 4.Assumption is that there is an ordered column using which range queries can > be formed<br> > 5.If an emitColumnList is provided, please ensure that the keyColumn is the > first column in the list > 6.Range queries are formed using the JdbcMetaDataUtility Output - comma > separated list of the emit columns eg columnA,columnB,columnC > Per window the first and the last key processed is saved using the > FSWindowDataManager - (<lowerBound,UpperBound>,operatorId,windowId).This > (lowerBound,upperBoundPair) is then used for recovery.The queries are > constructed using the JDBCMetaDataUtility. > JDBCMetaDataUtility > A utility class used to retrieve the metadata for a given unique key of a SQL > table. This class would emit range queries based on a primary index given. -- This message was sent by Atlassian JIRA (v6.3.4#6332)