[ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15321677#comment-15321677 ]
ASF GitHub Bot commented on APEXMALHAR-2066: -------------------------------------------- Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/282#discussion_r66363085 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java --- @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context.OperatorContext; + +/** + * A concrete implementation for {@link AbstractJdbcPollInputOperator}} for + * consuming data from MySQL using JDBC interface <br> + * User needs to provide tableName,dbConnection,setEmitColumnList,look-up key + * <br> + * Optionally batchSize,pollInterval,Look-up key and a where clause can be given + * <br> + * This operator uses static partitioning to arrive at range queries for exactly + * once reads<br> + * Assumption is that there is an ordered column using which range queries can + * be formed<br> + * If an emitColumnList is provided, please ensure that the keyColumn is the + * first column in the list<br> + * Range queries are formed using the {@link JdbcMetaDataUtility}} Output - + * comma separated list of the emit columns eg columnA,columnB,columnC + * + * @displayName Jdbc Polling Input Operator + * @category Input + * @tags database, sql, jdbc + */ +public class JdbcPollInputOperator extends AbstractJdbcPollInputOperator<Object> +{ + private long lastBatchWindowId; + private transient long currentWindowId; + private long lastCreationTsMillis; + private long fetchBackMillis = 0L; + private ArrayList<String> emitColumns; + private transient int count = 0; + + /** + * Returns the emit columns + */ + public List<String> getEmitColumns() + { + return emitColumns; + } + + /** + * Sets the emit columns + */ + public void setEmitColumns(ArrayList<String> emitColumns) + { + this.emitColumns = emitColumns; + } + + /** + * Returns fetchBackMilis + */ + public long getFetchBackMillis() + { + return fetchBackMillis; + } + + /** + * Sets fetchBackMilis - used in polling + */ + public void setFetchBackMillis(long fetchBackMillis) + { + this.fetchBackMillis = fetchBackMillis; + } + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + parseEmitColumnList(getEmitColumnList()); + lastCreationTsMillis = System.currentTimeMillis() - fetchBackMillis; + } + + private void parseEmitColumnList(String columnList) + { + String[] cols = columnList.split(","); + ArrayList<String> arr = Lists.newArrayList(); + for (int i = 0; i < cols.length; i++) { + arr.add(cols[i]); + } + setEmitColumns(arr); + } + + @Override + public void beginWindow(long l) + { + super.beginWindow(l); + currentWindowId = l; + } + + @Override + protected void pollRecords(PreparedStatement ps) + { + ResultSet rs = null; + List<Object> metaList = new ArrayList<>(); + + if (isReplayed) { + return; + } + + try { + if (ps.isClosed()) { + LOG.debug("Returning due to closed ps for non-pollable partitions"); + return; + } + } catch (SQLException e) { + LOG.error("Prepared statement is closed", e); + throw new RuntimeException(e); + } + + try (PreparedStatement pStat = ps;) { + pStat.setFetchSize(getFetchSize()); + LOG.debug("sql query = {}", pStat); + rs = pStat.executeQuery(); + boolean hasNext = false; + + if (rs == null || rs.isClosed()) { + return; + } + + while ((hasNext = rs.next())) { + Object key = null; + StringBuilder resultTuple = new StringBuilder(); + try { + if (count < getBatchSize()) { + key = rs.getObject(getKey()); + for (String obj : emitColumns) { + resultTuple.append(rs.getObject(obj) + ","); + } + metaList.add(resultTuple.substring(0, resultTuple.length() - 1)); + count++; + } else { + emitQueue.add(metaList); + metaList = new ArrayList<>(); + key = rs.getObject(getKey()); + for (String obj : emitColumns) { + resultTuple.append(rs.getObject(obj) + ","); + } + metaList.add(resultTuple.substring(0, resultTuple.length() - 1)); + count = 0; + } + } catch (NullPointerException npe) { + LOG.error("Key not found" + npe); + throw new RuntimeException(npe); + } + if (isPollable) { + highestPolled = key.toString(); + isPolled = true; + } + } + /*Flush the remaining records once the result set is over and batch-size is not reached, + * Dont flush if its pollable*/ + if (!hasNext) { + if ((isPollable && isPolled) || !isPollable) { + emitQueue.add(metaList); --- End diff -- This might throw an exception if the queue is full. Use offer() instead? > Add jdbc poller input operator > ------------------------------ > > Key: APEXMALHAR-2066 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066 > Project: Apache Apex Malhar > Issue Type: Task > Reporter: Ashwin Chandra Putta > Assignee: devendra tagare > > Create a JDBC poller input operator that has the following features. > 1. poll from external jdbc store asynchronously in the input operator. > 2. polling frequency and batch size should be configurable. > 3. should be idempotent. > 4. should be partition-able. > 5. should be batch + polling capable. > Assumptions for idempotency & partitioning, > 1.User needs to provide tableName,dbConnection,setEmitColumnList,look-up key. > 2.Optionally batchSize,pollInterval,Look-up key and a where clause can be > given. > 3.This operator uses static partitioning to arrive at range queries for > exactly once reads > 4.Assumption is that there is an ordered column using which range queries can > be formed<br> > 5.If an emitColumnList is provided, please ensure that the keyColumn is the > first column in the list > 6.Range queries are formed using the JdbcMetaDataUtility Output - comma > separated list of the emit columns eg columnA,columnB,columnC > Per window the first and the last key processed is saved using the > FSWindowDataManager - (<lowerBound,UpperBound>,operatorId,windowId).This > (lowerBound,upperBoundPair) is then used for recovery.The queries are > constructed using the JDBCMetaDataUtility. > JDBCMetaDataUtility > A utility class used to retrieve the metadata for a given unique key of a SQL > table. This class would emit range queries based on a primary index given. -- This message was sent by Atlassian JIRA (v6.3.4#6332)