[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15374441#comment-15374441
 ] 

ASF GitHub Bot commented on APEXMALHAR-2066:
--------------------------------------------

Github user tushargosavi commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/282#discussion_r70569998
  
    --- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
    @@ -0,0 +1,656 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.db.jdbc;
    +
    +import java.io.IOException;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import javax.validation.constraints.Min;
    +
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
    +import org.apache.apex.malhar.lib.wal.WindowDataManager;
    +import org.apache.commons.lang3.tuple.MutablePair;
    +import org.apache.hadoop.classification.InterfaceStability.Evolving;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.api.DefaultPartition;
    +import com.datatorrent.api.Operator.ActivationListener;
    +import com.datatorrent.api.Operator.IdleTimeHandler;
    +import com.datatorrent.api.Partitioner;
    +import com.datatorrent.api.annotation.Stateless;
    +import com.datatorrent.lib.db.AbstractStoreInputOperator;
    +import com.datatorrent.lib.util.KeyValPair;
    +import com.datatorrent.lib.util.KryoCloneUtils;
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +/**
    + * Abstract operator for for consuming data using JDBC interface<br>
    + * User needs User needs to provide
    + * tableName,dbConnection,setEmitColumnList,look-up key <br>
    + * Optionally batchSize,pollInterval,Look-up key and a where clause can be 
given
    + * <br>
    + * This operator uses static partitioning to arrive at range queries for 
exactly
    + * once reads<br>
    + * Assumption is that there is an ordered column using which range queries 
can
    + * be formed<br>
    + * If an emitColumnList is provided, please ensure that the keyColumn is 
the
    + * first column in the list<br>
    + * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
    + * comma separated list of the emit columns eg columnA,columnB,columnC<br>
    + * 
    + * In the next iterations this operator would support an in-clause for
    + * idempotency instead of having only range query support to support non 
ordered
    + * key columns
    + * 
    + * @displayName Jdbc Polling Input Operator
    + * @category Input
    + * @tags database, sql, jdbc, partitionable,exactlyOnce
    + */
    +@Evolving
    +public abstract class AbstractJdbcPollInputOperator<T> extends 
AbstractStoreInputOperator<T, JdbcStore>
    +    implements ActivationListener<Context>, IdleTimeHandler, 
Partitioner<AbstractJdbcPollInputOperator<T>>
    +{
    +  /**
    +   * poll interval in milliseconds
    +   */
    +  private static int pollInterval = 10000;
    +
    +  @Min(1)
    +  private int partitionCount = 1;
    +  protected transient int operatorId;
    +  protected transient boolean isReplayed;
    +  protected transient boolean isPollable;
    +  protected int batchSize;
    +  protected static int fetchSize = 20000;
    +  /**
    +   * Map of windowId to <lower bound,upper bound> of the range key
    +   */
    +  protected transient MutablePair<String, String> 
currentWindowRecoveryState;
    +
    +  /**
    +   * size of the emit queue used to hold polled records before emit
    +   */
    +  private static int queueCapacity = 4 * 1024 * 1024;
    +  private transient volatile boolean execute;
    +  private transient AtomicReference<Throwable> cause;
    +  protected transient int spinMillis;
    +  private transient OperatorContext context;
    +  protected String tableName;
    +  protected String key;
    +  protected long currentWindowId;
    +  protected KeyValPair<String, String> rangeQueryPair;
    +  protected String lower;
    +  protected String upper;
    +  protected boolean recovered;
    +  protected boolean isPolled;
    +  protected String whereCondition = null;
    +  protected String previousUpperBound;
    +  protected String highestPolled;
    +  private static final String user = "";
    +  private static final String password = "";
    +  /**
    +   * thread to poll database
    +   */
    +  private transient Thread dbPoller;
    +  protected transient ArrayBlockingQueue<List<T>> emitQueue;
    +  protected transient PreparedStatement ps;
    +  protected WindowDataManager windowManager;
    +  private String emitColumnList;
    +
    +  /**
    +   * Returns the where clause
    +   */
    +  public String getWhereCondition()
    +  {
    +    return whereCondition;
    +  }
    +
    +  /**
    +   * Sets the where clause
    +   */
    +  public void setWhereCondition(String whereCondition)
    +  {
    +    this.whereCondition = whereCondition;
    +  }
    +
    +  /**
    +   * Returns the list of columns to select from the query
    +   */
    +  public String getEmitColumnList()
    +  {
    +    return emitColumnList;
    +  }
    +
    +  /**
    +   * Comma separated list of columns to select from the given table
    +   */
    +  public void setEmitColumnList(String emitColumnList)
    +  {
    +    this.emitColumnList = emitColumnList;
    +  }
    +
    +  /**
    +   * Returns the fetchsize for getting the results
    +   */
    +  public int getFetchSize()
    +  {
    +    return fetchSize;
    +  }
    +
    +  /**
    +   * Sets the fetchsize for getting the results
    +   */
    +  public void setFetchSize(int fetchSize)
    +  {
    +    this.fetchSize = fetchSize;
    +  }
    +
    +  protected abstract void pollRecords(PreparedStatement ps);
    +
    +  /**
    +   * Returns the interval for polling the queue
    +   */
    +  public int getPollInterval()
    +  {
    +    return pollInterval;
    +  }
    +
    +  /**
    +   * Sets the interval for polling the emit queue
    +   */
    +  public void setPollInterval(int pollInterval)
    +  {
    +    this.pollInterval = pollInterval;
    +  }
    +
    +  /**
    +   * Returns the capacity of the emit queue
    +   */
    +  public int getQueueCapacity()
    +  {
    +    return queueCapacity;
    +  }
    +
    +  /**
    +   * Sets the capacity of the emit queue
    +   */
    +  public void setQueueCapacity(int queueCapacity)
    +  {
    +    this.queueCapacity = queueCapacity;
    +  }
    +
    +  /**
    +   * Returns the ordered key used to generate the range queries
    +   */
    +  public String getKey()
    +  {
    +    return key;
    +  }
    +
    +  /**
    +   * Sets the ordered key used to generate the range queries
    +   */
    +  public void setKey(String key)
    +  {
    +    this.key = key;
    +  }
    +
    +  /**
    +   * Returns the tableName which would be queried
    +   */
    +  public String getTableName()
    +  {
    +    return tableName;
    +  }
    +
    +  /**
    +   * Sets the tableName to query
    +   */
    +  public void setTableName(String tableName)
    +  {
    +    this.tableName = tableName;
    +  }
    +
    +  /**
    +   * Returns rangeQueryPair - <lowerBound,upperBound>
    +   */
    +  public KeyValPair<String, String> getRangeQueryPair()
    +  {
    +    return rangeQueryPair;
    +  }
    +
    +  /**
    +   * Sets the rangeQueryPair <lowerBound,upperBound>
    +   */
    +  public void setRangeQueryPair(KeyValPair<String, String> rangeQueryPair)
    +  {
    +    this.rangeQueryPair = rangeQueryPair;
    +  }
    +
    +  /**
    +   * Returns batchSize indicating the number of elements in emitQueue
    +   */
    +  public int getBatchSize()
    +  {
    +    return batchSize;
    +  }
    +
    +  /**
    +   * Sets batchSize for number of elements in the emitQueue
    +   */
    +  public void setBatchSize(int batchSize)
    +  {
    +    this.batchSize = batchSize;
    +  }
    +
    +  public AbstractJdbcPollInputOperator()
    +  {
    +    currentWindowRecoveryState = new MutablePair<>();
    +    windowManager = new FSWindowDataManager();
    +  }
    +
    +  @Override
    +  public void setup(OperatorContext context)
    +  {
    +    super.setup(context);
    +    spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
    +    execute = true;
    +    cause = new AtomicReference<Throwable>();
    +    emitQueue = new ArrayBlockingQueue<List<T>>(queueCapacity);
    +    this.context = context;
    +    operatorId = context.getId();
    +
    +    try {
    +
    +      //If its a range query pass upper and lower bounds
    +      //If its a polling query pass only the lower bound
    +      if (getRangeQueryPair().getValue() != null) {
    +        ps = store.getConnection()
    +            .prepareStatement(
    +                JdbcMetaDataUtility.buildRangeQuery(getTableName(), 
getKey(), rangeQueryPair.getKey(),
    +                    rangeQueryPair.getValue()),
    +                java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
    +      } else {
    +        ps = store.getConnection().prepareStatement(
    +            JdbcMetaDataUtility.buildPollableQuery(getTableName(), 
getKey(), rangeQueryPair.getKey()),
    +            java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
    +        isPollable = true;
    +      }
    +
    +    } catch (SQLException e) {
    +      LOG.error("Exception in initializing the range query for a given 
partition", e);
    +      throw new RuntimeException(e);
    +    }
    +
    +    windowManager.setup(context);
    +    LOG.debug("super setup done...");
    +  }
    +
    +  @Override
    +  public void beginWindow(long windowId)
    +  {
    +    currentWindowId = windowId;
    +
    +    isReplayed = false;
    +
    +    if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
    +      try {
    +        replay(currentWindowId);
    +      } catch (SQLException e) {
    +        LOG.error("Exception in replayed windows", e);
    +        throw new RuntimeException(e);
    +      }
    +    }
    +
    +    if (isReplayed && currentWindowId == 
windowManager.getLargestRecoveryWindow()) {
    +      try {
    +        if (!isPollable && rangeQueryPair.getValue() != null) {
    +
    +          ps = store.getConnection().prepareStatement(
    +              JdbcMetaDataUtility.buildGTRangeQuery(getTableName(), 
getKey(), previousUpperBound,
    +                  rangeQueryPair.getValue()),
    +              java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
    +        } else {
    +          String bound = null;
    +          if (previousUpperBound == null) {
    +            bound = getRangeQueryPair().getKey();
    +          } else {
    +            bound = previousUpperBound;
    +          }
    +          ps = store.getConnection().prepareStatement(
    +              JdbcMetaDataUtility.buildPollableQuery(getTableName(), 
getKey(), bound),
    +              java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
    +          isPollable = true;
    +        }
    +        isReplayed = false;
    +        LOG.debug("Prepared statement after re-initialization - " + 
ps.toString());
    +      } catch (SQLException e) {
    +        // TODO Auto-generated catch block
    +        throw new RuntimeException(e);
    +      }
    +    }
    +
    +    //Reset the pollable query with the updated upper and lower bounds
    +    if (isPollable) {
    +      try {
    +        String bound = null;
    +        if (previousUpperBound == null && highestPolled == null) {
    +          bound = getRangeQueryPair().getKey();
    +        } else {
    +          bound = highestPolled;
    +        }
    +        ps = store.getConnection().prepareStatement(
    +            JdbcMetaDataUtility.buildPollableQuery(getTableName(), 
getKey(), bound),
    +            java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
    +        LOG.debug("Polling query " + ps.toString() + "," + 
currentWindowId);
    +        isPolled = false;
    +      } catch (SQLException e) {
    +        throw new RuntimeException(e);
    +      }
    +    }
    +
    +    lower = null;
    +    upper = null;
    +
    +    //Check if a thread is already active and start only if its no
    +    //Do not start the thread from setup, will conflict with the replay
    +    if (dbPoller == null && !isReplayed) {
    +      //If this is not a replayed state, reset the ps to highest read 
offset + 1, 
    +      //keep the upper bound as the one that was initialized after static 
partitioning
    +      LOG.info("Statement when re-initialized" + ps.toString());
    +      dbPoller = new Thread(new DBPoller());
    +      dbPoller.start();
    +    }
    +  }
    +
    +  @Override
    +  public void emitTuples()
    +  {
    +    if (isReplayed) {
    +      return;
    +    }
    +
    +    List<T> tuples;
    +
    +    if ((tuples = emitQueue.poll()) != null) {
    +      for (Object tuple : tuples) {
    +        if (lower == null) {
    +          lower = tuple.toString();
    +        }
    +        upper = tuple.toString();
    +        outputPort.emit((T)tuple);
    +      }
    +    }
    +  }
    +
    +  @Override
    +  public void endWindow()
    +  {
    +    try {
    +      if (currentWindowId > windowManager.getLargestRecoveryWindow()) {
    +        if (currentWindowRecoveryState == null) {
    +          currentWindowRecoveryState = new MutablePair<String, String>();
    +        }
    +        if (lower != null && upper != null) {
    +          previousUpperBound = upper;
    +          isPolled = true;
    +        }
    +        MutablePair<String, String> windowBoundaryPair = new 
MutablePair<>(lower, upper);
    +        currentWindowRecoveryState = windowBoundaryPair;
    +        windowManager.save(currentWindowRecoveryState, operatorId, 
currentWindowId);
    +      }
    +    } catch (IOException e) {
    +      throw new RuntimeException("saving recovery", e);
    +    }
    +    currentWindowRecoveryState = new MutablePair<>();
    +  }
    +
    +  public int getPartitionCount()
    +  {
    +    return partitionCount;
    +  }
    +
    +  public void setPartitionCount(int partitionCount)
    +  {
    +    this.partitionCount = partitionCount;
    +  }
    +
    +  @Override
    +  public void activate(Context cntxt)
    +  {
    +    if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
    +        && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
    +      // If it is a replay state, don't start any threads here
    +      return;
    +    }
    +  }
    +
    +  @Override
    +  public void deactivate()
    +  {
    +    try {
    +      if (dbPoller != null && dbPoller.isAlive()) {
    +        dbPoller.interrupt();
    +        dbPoller.join();
    +      }
    +    } catch (InterruptedException ex) {
    +      // log and ignore, ending execution anyway
    +      LOG.error("exception in poller thread: ", ex);
    +    }
    +  }
    +
    +  @Override
    +  public void handleIdleTime()
    +  {
    +    if (execute) {
    +      try {
    +        Thread.sleep(spinMillis);
    +      } catch (InterruptedException ie) {
    +        throw new RuntimeException(ie);
    +      }
    +    } else {
    +      LOG.error("Exception: ", cause);
    +      DTThrowable.rethrow(cause.get());
    +    }
    +  }
    +
    +  protected void replay(long windowId) throws SQLException
    +  {
    +    isReplayed = true;
    +
    +    MutablePair<String, String> recoveredData = new MutablePair<String, 
String>();
    +    try {
    +      recoveredData = (MutablePair<String, 
String>)windowManager.load(operatorId, windowId);
    +
    +      if (recoveredData != null) {
    +        //skip the window and return if there was no incoming data in the 
window
    +        if (recoveredData.left == null || recoveredData.right == null) {
    +          return;
    +        }
    +
    +        if (recoveredData.right.equals(rangeQueryPair.getValue()) || 
recoveredData.right.equals(previousUpperBound)) {
    +          LOG.info("Matched so returning");
    +          return;
    +        }
    +
    +        JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator();
    +        jdbcPoller.setStore(store);
    +        jdbcPoller.setKey(getKey());
    +        jdbcPoller.setPartitionCount(getPartitionCount());
    +        jdbcPoller.setPollInterval(getPollInterval());
    +        jdbcPoller.setTableName(getTableName());
    +        jdbcPoller.setBatchSize(getBatchSize());
    +        isPollable = false;
    +
    +        LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + 
"," + recoveredData.right + "]");
    +
    +        jdbcPoller.setRangeQueryPair(new KeyValPair<String, 
String>(recoveredData.left, recoveredData.right));
    +
    +        jdbcPoller.ps = jdbcPoller.store.getConnection().prepareStatement(
    +            JdbcMetaDataUtility.buildRangeQuery(jdbcPoller.getTableName(), 
jdbcPoller.getKey(),
    +                jdbcPoller.getRangeQueryPair().getKey(), 
jdbcPoller.getRangeQueryPair().getValue()),
    +            java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
    +        LOG.info("Query formed for recovered data -" + 
jdbcPoller.ps.toString());
    +
    +        emitReplayedTuples(jdbcPoller.ps);
    +      }
    +    } catch (IOException e) {
    +      throw new RuntimeException("replay", e);
    +    }
    +
    +  }
    +
    +  /**
    +   * Replays the tuples in sync mode for replayed windows
    +   */
    +  public void emitReplayedTuples(PreparedStatement ps)
    +  {
    +    LOG.debug("Emitting replayed statement is -" + ps.toString());
    +    ResultSet rs = null;
    +    try (PreparedStatement pStat = ps;) {
    +      pStat.setFetchSize(getFetchSize());
    +      LOG.debug("sql query = {}", pStat);
    +      rs = pStat.executeQuery();
    +      if (rs == null || rs.isClosed()) {
    +        LOG.debug("Nothing to replay");
    +        return;
    +      }
    +      while (rs.next()) {
    +        previousUpperBound = rs.getObject(getKey()).toString();
    +        outputPort.emit((T)rs.getObject(getKey()));
    +      }
    +    } catch (SQLException ex) {
    +      throw new RuntimeException(ex);
    +    }
    +  }
    +
    +  /**
    +   * Uses a static partitioning scheme to initialize operator partitions 
with
    +   * non-overlapping key ranges to read In addition to 'n' partitions, 
'n+1'
    +   * partition is a polling partition which reads the records beyond the 
given
    +   * range
    +   */
    +  @Override
    +  public 
Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>>
 definePartitions(
    +      
Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>>
 partitions,
    +      com.datatorrent.api.Partitioner.PartitioningContext context)
    +  {
    +    List<Partition<AbstractJdbcPollInputOperator<T>>> newPartitions = new 
ArrayList<Partition<AbstractJdbcPollInputOperator<T>>>(
    +        getPartitionCount());
    +    JdbcStore jdbcStore = new JdbcStore();
    +    jdbcStore.setDatabaseDriver(store.getDatabaseDriver());
    +    jdbcStore.setDatabaseUrl(store.getDatabaseUrl());
    +    jdbcStore.setConnectionProperties(store.getConnectionProperties());
    +
    +    jdbcStore.connect();
    +
    +    HashMap<Integer, KeyValPair<String, String>> partitionToRangeMap = 
null;
    +    try {
    +      partitionToRangeMap = 
JdbcMetaDataUtility.getPartitionedQueryMap(getPartitionCount(),
    +          jdbcStore.getDatabaseDriver(), jdbcStore.getDatabaseUrl(), 
getTableName(), getKey(),
    +          store.getConnectionProperties().getProperty(user), 
store.getConnectionProperties().getProperty(password),
    +          whereCondition, emitColumnList);
    +    } catch (SQLException e) {
    +      LOG.error("Exception in initializing the partition range", e);
    +    }
    +
    +    KryoCloneUtils<AbstractJdbcPollInputOperator<T>> cloneUtils = 
KryoCloneUtils.createCloneUtils(this);
    +
    +    for (int i = 0; i <= getPartitionCount(); i++) {
    +      AbstractJdbcPollInputOperator<T> jdbcPoller = null;
    +
    +      jdbcPoller = cloneUtils.getClone();
    +
    +      jdbcPoller.setStore(store);
    +      jdbcPoller.setKey(getKey());
    +      jdbcPoller.setPartitionCount(getPartitionCount());
    +      jdbcPoller.setPollInterval(getPollInterval());
    +      jdbcPoller.setTableName(getTableName());
    +      jdbcPoller.setBatchSize(getBatchSize());
    +      jdbcPoller.setEmitColumnList(getEmitColumnList());
    +
    +      store.connect();
    +      //The n given partitions are for range queries and n + 1 partition 
is for polling query
    +      //The upper bound for the n+1 partition is set to null since its a 
pollable partition
    +      if (i < getPartitionCount()) {
    +        jdbcPoller.setRangeQueryPair(partitionToRangeMap.get(i));
    +        isPollable = false;
    +      } else {
    +        jdbcPoller.setRangeQueryPair(new KeyValPair<String, 
String>(partitionToRangeMap.get(i - 1).getValue(), null));
    +        isPollable = true;
    +      }
    +      Partition<AbstractJdbcPollInputOperator<T>> po = new 
DefaultPartition<AbstractJdbcPollInputOperator<T>>(
    +          jdbcPoller);
    +      newPartitions.add(po);
    +    }
    +
    +    previousUpperBound = null;
    +    return newPartitions;
    +  }
    +
    +  @Override
    +  public void partitioned(
    +      Map<Integer, 
com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> 
partitions)
    +  {
    +    //Nothing to implement here
    +  }
    +
    +  /**
    +   * This class polls a store that can be queried with a JDBC interface The
    +   * preparedStatement is updated as more rows are read
    +   */
    +  public class DBPoller implements Runnable
    +  {
    +    @Override
    +    public void run()
    +    {
    +      while (execute) {
    +        try {
    +          long startTs = System.currentTimeMillis();
    +          if ((isPollable && !isPolled) || !isPollable) {
    +            pollRecords(ps);
    +          }
    +          long endTs = System.currentTimeMillis();
    +          long ioTime = endTs - startTs;
    +          long sleepTime = pollInterval - ioTime;
    +          LOG.debug("pollInterval = " + pollInterval + " I/O time = " + 
ioTime + " sleepTime = " + sleepTime);
    --- End diff --
    
    use {} syntax for log message, the string gets evaluated even if log level 
is not enabled.


> Add jdbc poller input operator
> ------------------------------
>
>                 Key: APEXMALHAR-2066
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: Ashwin Chandra Putta
>            Assignee: devendra tagare
>
> Create a JDBC poller input operator that has the following features.
> 1. poll from external jdbc store asynchronously in the input operator.
> 2. polling frequency and batch size should be configurable.
> 3. should be idempotent.
> 4. should be partition-able.
> 5. should be batch + polling capable.
> Assumptions for idempotency & partitioning,
> 1.User needs to provide tableName,dbConnection,setEmitColumnList,look-up key.
> 2.Optionally batchSize,pollInterval,Look-up key and a where clause can be 
> given.
> 3.This operator uses static partitioning to arrive at range queries for 
> exactly once reads
> 4.Assumption is that there is an ordered column using which range queries can 
> be formed<br>
> 5.If an emitColumnList is provided, please ensure that the keyColumn is the 
> first column in the list
> 6.Range queries are formed using the JdbcMetaDataUtility Output - comma 
> separated list of the emit columns eg columnA,columnB,columnC
> Per window the first and the last key processed is saved using the 
> FSWindowDataManager - (<lowerBound,UpperBound>,operatorId,windowId).This 
> (lowerBound,upperBoundPair) is then used for recovery.The queries are 
> constructed using the JDBCMetaDataUtility.
> JDBCMetaDataUtility
> A utility class used to retrieve the metadata for a given unique key of a SQL 
> table. This class would emit range queries based on a primary index given.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to