[ https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15408992#comment-15408992 ]
ASF GitHub Bot commented on APEXMALHAR-2172: -------------------------------------------- Github user DT-Priyanka commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73648435 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -286,149 +146,119 @@ public AbstractJdbcPollInputOperator() public void setup(OperatorContext context) { super.setup(context); + intializeDSLContext(); + if (scanService == null) { + scanService = Executors.newScheduledThreadPool(partitionCount); + } spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS); execute = true; cause = new AtomicReference<Throwable>(); - emitQueue = new ArrayBlockingQueue<List<T>>(queueCapacity); - this.context = context; + emitQueue = new LinkedBlockingDeque<>(queueCapacity); operatorId = context.getId(); + windowManager.setup(context); + } - try { + private void intializeDSLContext() + { + create = DSL.using(store.getConnection(), JDBCUtils.dialect(store.getDatabaseUrl())); + } - //If its a range query pass upper and lower bounds - //If its a polling query pass only the lower bound - if (getRangeQueryPair().getValue() != null) { - ps = store.getConnection() - .prepareStatement( - JdbcMetaDataUtility.buildRangeQuery(getTableName(), getKey(), rangeQueryPair.getKey(), - rangeQueryPair.getValue()), - java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); + @Override + public void activate(OperatorContext context) + { + initializePreparedStatement(); + long largestRecoveryWindow = windowManager.getLargestRecoveryWindow(); + if (largestRecoveryWindow == Stateless.WINDOW_ID + || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow) { + scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS); + } + } + + protected void initializePreparedStatement() + { + try { + // If its a range query pass upper and lower bounds, If its a polling query pass only the lower bound + if (isPollerPartition) { + ps = store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(), Integer.MAX_VALUE), --- End diff -- rangeQueryPair is calculated during "definePartition" and use is not suppose to set it. Let me try to make that part of code more readable. > Update JDBC poll input operator to fix issues > --------------------------------------------- > > Key: APEXMALHAR-2172 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172 > Project: Apache Apex Malhar > Issue Type: Improvement > Reporter: Priyanka Gugale > Assignee: Priyanka Gugale > > Update JDBCPollInputOperator to: > 1. Fix small bugs > 2. Use jooq query dsl library to construct sql queries > 3. Make code more readable > 4. Use row counts rather than key column values to partition reads -- This message was sent by Atlassian JIRA (v6.3.4#6332)