[ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15373298#comment-15373298 ]
ASF GitHub Bot commented on APEXMALHAR-2066: -------------------------------------------- Github user devtagare commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/282#discussion_r70483252 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java --- @@ -0,0 +1,344 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.util.KeyValPair; + +/** + * A utility class used to retrieve the metadata for a given unique key of a SQL + * table. This class would emit range queries based on a primary index given + * + * @Input - dbName,tableName, primaryKey + * @Output - map<operatorId,prepared statement> + * + */ +public class JdbcMetaDataUtility +{ + private static String DB_DRIVER = "com.mysql.jdbc.Driver"; + private static String DB_CONNECTION = ""; + private static String DB_USER = ""; + private static String DB_PASSWORD = ""; + private static String TABLE_NAME = ""; + private static String KEY_COLUMN = ""; + private static String WHERE_CLAUSE = null; + private static String COLUMN_LIST = null; + + private static Logger LOG = LoggerFactory.getLogger(JdbcMetaDataUtility.class); + + public JdbcMetaDataUtility() + { + + } + + public JdbcMetaDataUtility(String dbConnection, String tableName, String key, String userName, String password) + { + DB_CONNECTION = dbConnection; + DB_USER = userName; + DB_PASSWORD = password; + TABLE_NAME = tableName; + KEY_COLUMN = key; + } + + private static Connection getDBConnection() + { + + Connection dbConnection = null; + + try { + Class.forName(DB_DRIVER); + } catch (ClassNotFoundException e) { + LOG.error("Driver not found", e); + } + + try { + dbConnection = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD); + return dbConnection; + } catch (SQLException e) { + LOG.error("Exception in getting connection handle", e); --- End diff -- Done > Add jdbc poller input operator > ------------------------------ > > Key: APEXMALHAR-2066 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066 > Project: Apache Apex Malhar > Issue Type: Task > Reporter: Ashwin Chandra Putta > Assignee: devendra tagare > > Create a JDBC poller input operator that has the following features. > 1. poll from external jdbc store asynchronously in the input operator. > 2. polling frequency and batch size should be configurable. > 3. should be idempotent. > 4. should be partition-able. > 5. should be batch + polling capable. > Assumptions for idempotency & partitioning, > 1.User needs to provide tableName,dbConnection,setEmitColumnList,look-up key. > 2.Optionally batchSize,pollInterval,Look-up key and a where clause can be > given. > 3.This operator uses static partitioning to arrive at range queries for > exactly once reads > 4.Assumption is that there is an ordered column using which range queries can > be formed<br> > 5.If an emitColumnList is provided, please ensure that the keyColumn is the > first column in the list > 6.Range queries are formed using the JdbcMetaDataUtility Output - comma > separated list of the emit columns eg columnA,columnB,columnC > Per window the first and the last key processed is saved using the > FSWindowDataManager - (<lowerBound,UpperBound>,operatorId,windowId).This > (lowerBound,upperBoundPair) is then used for recovery.The queries are > constructed using the JDBCMetaDataUtility. > JDBCMetaDataUtility > A utility class used to retrieve the metadata for a given unique key of a SQL > table. This class would emit range queries based on a primary index given. -- This message was sent by Atlassian JIRA (v6.3.4#6332)