[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15413008#comment-15413008
 ] 

ASF GitHub Bot commented on APEXMALHAR-2172:
--------------------------------------------

Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/358#discussion_r74003272
  
    --- Diff: 
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 ---
    @@ -286,271 +146,233 @@ public AbstractJdbcPollInputOperator()
       public void setup(OperatorContext context)
       {
         super.setup(context);
    -    spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
    +    intializeDSLContext();
    +    if (scanService == null) {
    +      scanService = Executors.newScheduledThreadPool(1);
    +    }
         execute = true;
    -    cause = new AtomicReference<Throwable>();
    -    emitQueue = new ArrayBlockingQueue<List<T>>(queueCapacity);
    -    this.context = context;
    +    emitQueue = new LinkedBlockingDeque<>(queueCapacity);
         operatorId = context.getId();
    +    windowManager.setup(context);
    +  }
     
    -    try {
    +  private void intializeDSLContext()
    +  {
    +    create = DSL.using(store.getConnection(), 
JDBCUtils.dialect(store.getDatabaseUrl()));
    +  }
    +
    +  @Override
    +  public void activate(OperatorContext context)
    +  {
    +    initializePreparedStatement();
    +    long largestRecoveryWindow = windowManager.getLargestRecoveryWindow();
    +    if (largestRecoveryWindow == Stateless.WINDOW_ID
    +        || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) 
> largestRecoveryWindow) {
    +      scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, 
TimeUnit.MILLISECONDS);
    +    }
    +  }
     
    -      //If its a range query pass upper and lower bounds
    -      //If its a polling query pass only the lower bound
    -      if (getRangeQueryPair().getValue() != null) {
    -        ps = store.getConnection()
    -            .prepareStatement(
    -                JdbcMetaDataUtility.buildRangeQuery(getTableName(), 
getKey(), rangeQueryPair.getKey(),
    -                    rangeQueryPair.getValue()),
    -                java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
    +  protected void initializePreparedStatement()
    +  {
    +    try {
    +      // If its a range query pass upper and lower bounds, If its a 
polling query pass only the lower bound
    +      if (isPollerPartition) {
    +        ps = 
store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(), 
Integer.MAX_VALUE),
    +            TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
           } else {
             ps = store.getConnection().prepareStatement(
    -            JdbcMetaDataUtility.buildPollableQuery(getTableName(), 
getKey(), rangeQueryPair.getKey()),
    -            java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
    -        isPollable = true;
    +            buildRangeQuery(rangeQueryPair.getKey(), 
(rangeQueryPair.getValue() - rangeQueryPair.getKey())),
    +            TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
           }
    -
         } catch (SQLException e) {
           LOG.error("Exception in initializing the range query for a given 
partition", e);
           throw new RuntimeException(e);
         }
     
    -    windowManager.setup(context);
    -    LOG.debug("super setup done...");
       }
     
       @Override
       public void beginWindow(long windowId)
       {
         currentWindowId = windowId;
    -
    -    isReplayed = false;
    -
         if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
           try {
             replay(currentWindowId);
    +        return;
           } catch (SQLException e) {
             LOG.error("Exception in replayed windows", e);
             throw new RuntimeException(e);
           }
         }
    -
    -    if (isReplayed && currentWindowId == 
windowManager.getLargestRecoveryWindow()) {
    -      try {
    -        if (!isPollable && rangeQueryPair.getValue() != null) {
    -
    -          ps = store.getConnection().prepareStatement(
    -              JdbcMetaDataUtility.buildGTRangeQuery(getTableName(), 
getKey(), previousUpperBound,
    -                  rangeQueryPair.getValue()),
    -              java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
    -        } else {
    -          String bound = null;
    -          if (previousUpperBound == null) {
    -            bound = getRangeQueryPair().getKey();
    -          } else {
    -            bound = previousUpperBound;
    -          }
    -          ps = store.getConnection().prepareStatement(
    -              JdbcMetaDataUtility.buildPollableQuery(getTableName(), 
getKey(), bound),
    -              java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
    -          isPollable = true;
    -        }
    -        isReplayed = false;
    -        LOG.debug("Prepared statement after re-initialization - {} ", 
ps.toString());
    -      } catch (SQLException e) {
    -        // TODO Auto-generated catch block
    -        throw new RuntimeException(e);
    -      }
    +    if (isPollerPartition) {
    +      updatePollQuery();
    +      isPolled = false;
         }
    +    lowerBound = lastEmittedRecord;
    +  }
     
    -    //Reset the pollable query with the updated upper and lower bounds
    -    if (isPollable) {
    +  private void updatePollQuery()
    +  {
    +    if ((lastPolledBound != lastEmittedRecord)) {
    +      if (lastEmittedRecord == null) {
    +        lastPolledBound = rangeQueryPair.getKey();
    +      } else {
    +        lastPolledBound = lastEmittedRecord;
    +      }
           try {
    -        String bound = null;
    -        if (previousUpperBound == null && highestPolled == null) {
    -          bound = getRangeQueryPair().getKey();
    -        } else {
    -          bound = highestPolled;
    -        }
    -        ps = store.getConnection().prepareStatement(
    -            JdbcMetaDataUtility.buildPollableQuery(getTableName(), 
getKey(), bound),
    -            java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
    -        LOG.debug("Polling query {} {}", ps.toString(), currentWindowId);
    -        isPolled = false;
    +        ps = 
store.getConnection().prepareStatement(buildRangeQuery(lastPolledBound, 
Integer.MAX_VALUE),
    +            TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
           } catch (SQLException e) {
             throw new RuntimeException(e);
           }
         }
     
    -    lower = null;
    -    upper = null;
    -
    -    //Check if a thread is already active and start only if its no
    -    //Do not start the thread from setup, will conflict with the replay
    -    if (dbPoller == null && !isReplayed) {
    -      //If this is not a replayed state, reset the ps to highest read 
offset + 1, 
    -      //keep the upper bound as the one that was initialized after static 
partitioning
    -      LOG.info("Statement when re-initialized {}", ps.toString());
    -      dbPoller = new Thread(new DBPoller());
    -      dbPoller.start();
    -    }
       }
     
       @Override
       public void emitTuples()
       {
    -    if (isReplayed) {
    +    if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
           return;
         }
    -
    -    List<T> tuples;
    -
    -    if ((tuples = emitQueue.poll()) != null) {
    -      for (Object tuple : tuples) {
    -        if (lower == null) {
    -          lower = tuple.toString();
    -        }
    -        upper = tuple.toString();
    -        outputPort.emit((T)tuple);
    +    int pollSize = (emitQueue.size() < batchSize) ? emitQueue.size() : 
batchSize;
    +    while (pollSize-- > 0) {
    +      T obj = emitQueue.poll();
    +      if (obj != null) {
    +        emitTuple(obj);
           }
    +      lastEmittedRecord++;
         }
       }
     
    +  protected void emitTuple(T obj)
    +  {
    +    outputPort.emit(obj);
    +  }
    +
       @Override
       public void endWindow()
       {
         try {
           if (currentWindowId > windowManager.getLargestRecoveryWindow()) {
    -        if (currentWindowRecoveryState == null) {
    -          currentWindowRecoveryState = new MutablePair<String, String>();
    -        }
    -        if (lower != null && upper != null) {
    -          previousUpperBound = upper;
    -          isPolled = true;
    -        }
    -        MutablePair<String, String> windowBoundaryPair = new 
MutablePair<>(lower, upper);
    -        currentWindowRecoveryState = windowBoundaryPair;
    +        currentWindowRecoveryState = new MutablePair<>(lowerBound, 
lastEmittedRecord);
             windowManager.save(currentWindowRecoveryState, operatorId, 
currentWindowId);
           }
         } catch (IOException e) {
           throw new RuntimeException("saving recovery", e);
         }
    -    currentWindowRecoveryState = new MutablePair<>();
    -  }
    -
    -  public int getPartitionCount()
    -  {
    -    return partitionCount;
    +    if (threadException != null) {
    +      store.disconnect();
    +      DTThrowable.rethrow(threadException.get());
    +    }
       }
     
    -  public void setPartitionCount(int partitionCount)
    +  @Override
    +  public void deactivate()
       {
    -    this.partitionCount = partitionCount;
    +    scanService.shutdownNow();
    +    store.disconnect();
       }
     
    -  @Override
    -  public void activate(Context cntxt)
    +  protected void pollRecords()
       {
    -    if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID
    -        && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowManager.getLargestRecoveryWindow()) {
    -      // If it is a replay state, don't start any threads here
    +    if (isPolled) {
           return;
         }
    -  }
    -
    -  @Override
    -  public void deactivate()
    -  {
         try {
    -      if (dbPoller != null && dbPoller.isAlive()) {
    -        dbPoller.interrupt();
    -        dbPoller.join();
    +      ps.setFetchSize(getFetchSize());
    +      ResultSet result = ps.executeQuery();
    +      if (result.next()) {
    +        do {
    +          emitQueue.offer(getTuple(result));
    +        } while (result.next());
           }
    -    } catch (InterruptedException ex) {
    -      // log and ignore, ending execution anyway
    -      LOG.error("exception in poller thread: ", ex);
    -    }
    -  }
    -
    -  @Override
    -  public void handleIdleTime()
    -  {
    -    if (execute) {
    -      try {
    -        Thread.sleep(spinMillis);
    -      } catch (InterruptedException ie) {
    -        throw new RuntimeException(ie);
    +      isPolled = true;
    +    } catch (SQLException ex) {
    +      execute = false;
    +      threadException = new AtomicReference<Throwable>(ex);
    +    } finally {
    +      if (!isPollerPartition) {
    +        store.disconnect();
           }
    -    } else {
    -      LOG.error("Exception: ", cause);
    -      DTThrowable.rethrow(cause.get());
         }
       }
     
    +  public abstract T getTuple(ResultSet result);
    +
       protected void replay(long windowId) throws SQLException
       {
    -    isReplayed = true;
     
    -    MutablePair<String, String> recoveredData = new MutablePair<String, 
String>();
         try {
    -      recoveredData = (MutablePair<String, 
String>)windowManager.load(operatorId, windowId);
    -
    -      if (recoveredData != null) {
    -        //skip the window and return if there was no incoming data in the 
window
    -        if (recoveredData.left == null || recoveredData.right == null) {
    -          return;
    -        }
    +      MutablePair<Integer, Integer> recoveredData = (MutablePair<Integer, 
Integer>)windowManager.load(operatorId,
    +          windowId);
     
    -        if (recoveredData.right.equals(rangeQueryPair.getValue()) || 
recoveredData.right.equals(previousUpperBound)) {
    -          LOG.info("Matched so returning");
    -          return;
    -        }
    -
    -        JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator();
    -        jdbcPoller.setStore(store);
    -        jdbcPoller.setKey(getKey());
    -        jdbcPoller.setPartitionCount(getPartitionCount());
    -        jdbcPoller.setPollInterval(getPollInterval());
    -        jdbcPoller.setTableName(getTableName());
    -        jdbcPoller.setBatchSize(getBatchSize());
    -        isPollable = false;
    +      if (recoveredData != null && shouldReplayWindow(recoveredData)) {
    +        LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", 
windowId, recoveredData.left,
    +            recoveredData.right);
     
    -        LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + 
"," + recoveredData.right + "]");
    +        ps = store.getConnection().prepareStatement(
    +            buildRangeQuery(recoveredData.left, (recoveredData.right - 
recoveredData.left)), TYPE_FORWARD_ONLY,
    +            CONCUR_READ_ONLY);
    +        LOG.info("Query formed to recover data - {}", ps.toString());
     
    -        jdbcPoller.setRangeQueryPair(new KeyValPair<String, 
String>(recoveredData.left, recoveredData.right));
    +        emitReplayedTuples(ps);
     
    -        jdbcPoller.ps = jdbcPoller.store.getConnection().prepareStatement(
    -            JdbcMetaDataUtility.buildRangeQuery(jdbcPoller.getTableName(), 
jdbcPoller.getKey(),
    -                jdbcPoller.getRangeQueryPair().getKey(), 
jdbcPoller.getRangeQueryPair().getValue()),
    -            java.sql.ResultSet.TYPE_FORWARD_ONLY, 
java.sql.ResultSet.CONCUR_READ_ONLY);
    -        LOG.info("Query formed for recovered data - {}", 
jdbcPoller.ps.toString());
    +      }
     
    -        emitReplayedTuples(jdbcPoller.ps);
    +      if (currentWindowId == windowManager.getLargestRecoveryWindow()) {
    +        try {
    +          if (!isPollerPartition && rangeQueryPair.getValue() != null) {
    +            ps = store.getConnection().prepareStatement(
    +                buildRangeQuery(lastEmittedRecord, 
(rangeQueryPair.getValue() - lastEmittedRecord)), TYPE_FORWARD_ONLY,
    +                CONCUR_READ_ONLY);
    +          } else {
    +            Integer bound = null;
    +            if (lastEmittedRecord == null) {
    +              bound = rangeQueryPair.getKey();
    +            } else {
    +              bound = lastEmittedRecord;
    +            }
    +            ps = 
store.getConnection().prepareStatement(buildRangeQuery(bound, 
Integer.MAX_VALUE), TYPE_FORWARD_ONLY,
    +                CONCUR_READ_ONLY);
    +          }
    +          scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, 
TimeUnit.MILLISECONDS);
    +        } catch (SQLException e) {
    +          throw new RuntimeException(e);
    +        }
           }
         } catch (IOException e) {
           throw new RuntimeException("replay", e);
    --- End diff --
    
    Better description for the exception?


> Update JDBC poll input operator to fix issues
> ---------------------------------------------
>
>                 Key: APEXMALHAR-2172
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
>             Project: Apache Apex Malhar
>          Issue Type: Improvement
>            Reporter: Priyanka Gugale
>            Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to