[ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15373332#comment-15373332 ]
ASF GitHub Bot commented on APEXMALHAR-2066: -------------------------------------------- Github user devtagare commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/282#discussion_r70488149 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -0,0 +1,652 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import javax.validation.constraints.Min; + +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.commons.lang3.tuple.MutablePair; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.Operator.IdleTimeHandler; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.lib.db.AbstractStoreInputOperator; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Abstract operator for for consuming data using JDBC interface<br> + * User needs User needs to provide + * tableName,dbConnection,setEmitColumnList,look-up key <br> + * Optionally batchSize,pollInterval,Look-up key and a where clause can be given + * <br> + * This operator uses static partitioning to arrive at range queries for exactly + * once reads<br> + * Assumption is that there is an ordered column using which range queries can + * be formed<br> + * If an emitColumnList is provided, please ensure that the keyColumn is the + * first column in the list<br> + * Range queries are formed using the {@link JdbcMetaDataUtility}} Output - + * comma separated list of the emit columns eg columnA,columnB,columnC + * + * @displayName Jdbc Polling Input Operator + * @category Input + * @tags database, sql, jdbc, partitionable,exactlyOnce + */ +public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInputOperator<T, JdbcStore> --- End diff -- Done > Add jdbc poller input operator > ------------------------------ > > Key: APEXMALHAR-2066 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066 > Project: Apache Apex Malhar > Issue Type: Task > Reporter: Ashwin Chandra Putta > Assignee: devendra tagare > > Create a JDBC poller input operator that has the following features. > 1. poll from external jdbc store asynchronously in the input operator. > 2. polling frequency and batch size should be configurable. > 3. should be idempotent. > 4. should be partition-able. > 5. should be batch + polling capable. > Assumptions for idempotency & partitioning, > 1.User needs to provide tableName,dbConnection,setEmitColumnList,look-up key. > 2.Optionally batchSize,pollInterval,Look-up key and a where clause can be > given. > 3.This operator uses static partitioning to arrive at range queries for > exactly once reads > 4.Assumption is that there is an ordered column using which range queries can > be formed<br> > 5.If an emitColumnList is provided, please ensure that the keyColumn is the > first column in the list > 6.Range queries are formed using the JdbcMetaDataUtility Output - comma > separated list of the emit columns eg columnA,columnB,columnC > Per window the first and the last key processed is saved using the > FSWindowDataManager - (<lowerBound,UpperBound>,operatorId,windowId).This > (lowerBound,upperBoundPair) is then used for recovery.The queries are > constructed using the JDBCMetaDataUtility. > JDBCMetaDataUtility > A utility class used to retrieve the metadata for a given unique key of a SQL > table. This class would emit range queries based on a primary index given. -- This message was sent by Atlassian JIRA (v6.3.4#6332)