[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15321677#comment-15321677
 ] 

ASF GitHub Bot commented on APEXMALHAR-2066:
--------------------------------------------

Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/282#discussion_r66363085
  
    --- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java ---
    @@ -0,0 +1,226 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.db.jdbc;
    +
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.collect.Lists;
    +
    +import com.datatorrent.api.Context.OperatorContext;
    +
    +/**
    + * A concrete implementation for {@link AbstractJdbcPollInputOperator}} for
    + * consuming data from MySQL using JDBC interface <br>
    + * User needs to provide tableName,dbConnection,setEmitColumnList,look-up 
key
    + * <br>
    + * Optionally batchSize,pollInterval,Look-up key and a where clause can be 
given
    + * <br>
    + * This operator uses static partitioning to arrive at range queries for 
exactly
    + * once reads<br>
    + * Assumption is that there is an ordered column using which range queries 
can
    + * be formed<br>
    + * If an emitColumnList is provided, please ensure that the keyColumn is 
the
    + * first column in the list<br>
    + * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
    + * comma separated list of the emit columns eg columnA,columnB,columnC
    + * 
    + * @displayName Jdbc Polling Input Operator
    + * @category Input
    + * @tags database, sql, jdbc
    + */
    +public class JdbcPollInputOperator extends 
AbstractJdbcPollInputOperator<Object>
    +{
    +  private long lastBatchWindowId;
    +  private transient long currentWindowId;
    +  private long lastCreationTsMillis;
    +  private long fetchBackMillis = 0L;
    +  private ArrayList<String> emitColumns;
    +  private transient int count = 0;
    +
    +  /**
    +   * Returns the emit columns
    +   */
    +  public List<String> getEmitColumns()
    +  {
    +    return emitColumns;
    +  }
    +
    +  /**
    +   * Sets the emit columns
    +   */
    +  public void setEmitColumns(ArrayList<String> emitColumns)
    +  {
    +    this.emitColumns = emitColumns;
    +  }
    +
    +  /**
    +   * Returns fetchBackMilis
    +   */
    +  public long getFetchBackMillis()
    +  {
    +    return fetchBackMillis;
    +  }
    +
    +  /**
    +   * Sets fetchBackMilis - used in polling
    +   */
    +  public void setFetchBackMillis(long fetchBackMillis)
    +  {
    +    this.fetchBackMillis = fetchBackMillis;
    +  }
    +
    +  @Override
    +  public void setup(OperatorContext context)
    +  {
    +    super.setup(context);
    +    parseEmitColumnList(getEmitColumnList());
    +    lastCreationTsMillis = System.currentTimeMillis() - fetchBackMillis;
    +  }
    +
    +  private void parseEmitColumnList(String columnList)
    +  {
    +    String[] cols = columnList.split(",");
    +    ArrayList<String> arr = Lists.newArrayList();
    +    for (int i = 0; i < cols.length; i++) {
    +      arr.add(cols[i]);
    +    }
    +    setEmitColumns(arr);
    +  }
    +
    +  @Override
    +  public void beginWindow(long l)
    +  {
    +    super.beginWindow(l);
    +    currentWindowId = l;
    +  }
    +
    +  @Override
    +  protected void pollRecords(PreparedStatement ps)
    +  {
    +    ResultSet rs = null;
    +    List<Object> metaList = new ArrayList<>();
    +
    +    if (isReplayed) {
    +      return;
    +    }
    +
    +    try {
    +      if (ps.isClosed()) {
    +        LOG.debug("Returning due to closed ps for non-pollable 
partitions");
    +        return;
    +      }
    +    } catch (SQLException e) {
    +      LOG.error("Prepared statement is closed", e);
    +      throw new RuntimeException(e);
    +    }
    +
    +    try (PreparedStatement pStat = ps;) {
    +      pStat.setFetchSize(getFetchSize());
    +      LOG.debug("sql query = {}", pStat);
    +      rs = pStat.executeQuery();
    +      boolean hasNext = false;
    +
    +      if (rs == null || rs.isClosed()) {
    +        return;
    +      }
    +
    +      while ((hasNext = rs.next())) {
    +        Object key = null;
    +        StringBuilder resultTuple = new StringBuilder();
    +        try {
    +          if (count < getBatchSize()) {
    +            key = rs.getObject(getKey());
    +            for (String obj : emitColumns) {
    +              resultTuple.append(rs.getObject(obj) + ",");
    +            }
    +            metaList.add(resultTuple.substring(0, resultTuple.length() - 
1));
    +            count++;
    +          } else {
    +            emitQueue.add(metaList);
    +            metaList = new ArrayList<>();
    +            key = rs.getObject(getKey());
    +            for (String obj : emitColumns) {
    +              resultTuple.append(rs.getObject(obj) + ",");
    +            }
    +            metaList.add(resultTuple.substring(0, resultTuple.length() - 
1));
    +            count = 0;
    +          }
    +        } catch (NullPointerException npe) {
    +          LOG.error("Key not found" + npe);
    +          throw new RuntimeException(npe);
    +        }
    +        if (isPollable) {
    +          highestPolled = key.toString();
    +          isPolled = true;
    +        }
    +      }
    +      /*Flush the remaining records once the result set is over and 
batch-size is not reached,
    +       * Dont flush if its pollable*/
    +      if (!hasNext) {
    +        if ((isPollable && isPolled) || !isPollable) {
    +          emitQueue.add(metaList);
    --- End diff --
    
    This might throw an exception if the queue is full. Use offer() instead?


> Add jdbc poller input operator
> ------------------------------
>
>                 Key: APEXMALHAR-2066
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: Ashwin Chandra Putta
>            Assignee: devendra tagare
>
> Create a JDBC poller input operator that has the following features.
> 1. poll from external jdbc store asynchronously in the input operator.
> 2. polling frequency and batch size should be configurable.
> 3. should be idempotent.
> 4. should be partition-able.
> 5. should be batch + polling capable.
> Assumptions for idempotency & partitioning,
> 1.User needs to provide tableName,dbConnection,setEmitColumnList,look-up key.
> 2.Optionally batchSize,pollInterval,Look-up key and a where clause can be 
> given.
> 3.This operator uses static partitioning to arrive at range queries for 
> exactly once reads
> 4.Assumption is that there is an ordered column using which range queries can 
> be formed<br>
> 5.If an emitColumnList is provided, please ensure that the keyColumn is the 
> first column in the list
> 6.Range queries are formed using the JdbcMetaDataUtility Output - comma 
> separated list of the emit columns eg columnA,columnB,columnC
> Per window the first and the last key processed is saved using the 
> FSWindowDataManager - (<lowerBound,UpperBound>,operatorId,windowId).This 
> (lowerBound,upperBoundPair) is then used for recovery.The queries are 
> constructed using the JDBCMetaDataUtility.
> JDBCMetaDataUtility
> A utility class used to retrieve the metadata for a given unique key of a SQL 
> table. This class would emit range queries based on a primary index given.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to