[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372277#comment-15372277
 ] 

ASF GitHub Bot commented on APEXMALHAR-2066:
--------------------------------------------

Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/282#discussion_r70385015
  
    --- Diff: 
library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java ---
    @@ -0,0 +1,246 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.db.jdbc;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.sql.Connection;
    +import java.sql.DriverManager;
    +import java.sql.PreparedStatement;
    +import java.sql.SQLException;
    +import java.sql.Statement;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +
    +import org.apache.commons.io.FileUtils;
    +
    +import com.google.common.collect.Lists;
    +
    +import com.datatorrent.api.Attribute;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultPartition;
    +import com.datatorrent.api.Partitioner;
    +import com.datatorrent.lib.helper.OperatorContextTestHelper;
    +import com.datatorrent.lib.testbench.CollectorTestSink;
    +import com.datatorrent.lib.util.TestUtils;
    +
    +/**
    + * Tests for {@link AbstractJdbcPollInputOperator} and
    + * {@link JdbcPollInputOperator}
    + */
    +public class JdbcPollerTest
    +{
    +  public static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
    +  public static final String URL = 
"jdbc:hsqldb:mem:test;sql.syntax_mys=true";
    +
    +  private static final String TABLE_NAME = "test_account_table";
    +  private static String APP_ID = "JdbcPollingOperatorTest";
    +  public String dir = null;
    +
    +  @BeforeClass
    +  public static void setup()
    +  {
    +    try {
    +      cleanup();
    +    } catch (Exception e) {
    +      throw new RuntimeException(e);
    +    }
    +    try {
    +      Class.forName(DB_DRIVER).newInstance();
    +
    +      Connection con = DriverManager.getConnection(URL);
    +      Statement stmt = con.createStatement();
    +
    +      String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
    +          + " (Account_No INTEGER, Name VARCHAR(255), Amount INTEGER)";
    +      stmt.executeUpdate(createTable);
    +
    +    } catch (Exception e) {
    +      throw new RuntimeException(e);
    +    }
    +  }
    +
    +  @AfterClass
    +  public static void cleanup()
    +  {
    +    try {
    +      FileUtils.deleteDirectory(new File("target/" + APP_ID));
    +    } catch (IOException e) {
    +      throw new RuntimeException(e);
    +    }
    +  }
    +
    +  public static void cleanTable()
    +  {
    +    try {
    +      Connection con = DriverManager.getConnection(URL);
    +      Statement stmt = con.createStatement();
    +      String cleanTable = "delete from " + TABLE_NAME;
    +      stmt.executeUpdate(cleanTable);
    +    } catch (SQLException e) {
    +      throw new RuntimeException(e);
    +    }
    +  }
    +
    +  public static void insertEventsInTable(int numEvents, int offset)
    +  {
    +    try {
    +      Connection con = DriverManager.getConnection(URL);
    +      String insert = "insert into " + TABLE_NAME + " values (?,?,?)";
    +      PreparedStatement stmt = con.prepareStatement(insert);
    +      for (int i = 0; i < numEvents; i++, offset++) {
    +        stmt.setInt(1, offset);
    +        stmt.setString(2, "Account_Holder-" + offset);
    +        stmt.setInt(3, (offset * 1000));
    +        stmt.executeUpdate();
    +      }
    +    } catch (SQLException e) {
    +      throw new RuntimeException(e);
    +    }
    +  }
    +
    +  /**
    +   * Simulates actual application flow Adds a batch query partitiom, a 
pollable
    +   * partition Incremental record polling is also checked
    +   */
    +  @Test
    +  public void testJdbcPollingInputOperatorBatch() throws 
InterruptedException
    +  {
    +    cleanTable();
    +    insertEventsInTable(10, 0);
    +    JdbcStore store = new JdbcStore();
    +    store.setDatabaseDriver(DB_DRIVER);
    +    store.setDatabaseUrl(URL);
    +
    +    Attribute.AttributeMap.DefaultAttributeMap attributeMap = new 
Attribute.AttributeMap.DefaultAttributeMap();
    +    this.dir = "target/" + APP_ID + "/";
    +    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
    +    attributeMap.put(Context.DAGContext.APPLICATION_PATH, dir);
    +
    +    JdbcPollInputOperator inputOperator = new JdbcPollInputOperator();
    +    inputOperator.setStore(store);
    +    inputOperator.setBatchSize(100);
    +    inputOperator.setPollInterval(1000);
    +    inputOperator.setEmitColumnList("Account_No,Name,Amount");
    +    inputOperator.setKey("Account_No");
    +    inputOperator.setTableName(TABLE_NAME);
    +    inputOperator.setFetchSize(100);
    +    inputOperator.setPartitionCount(1);
    +
    +    CollectorTestSink<Object> sink = new CollectorTestSink<>();
    +    inputOperator.outputPort.setSink(sink);
    +
    +    TestUtils.MockBatchedOperatorStats readerStats = new 
TestUtils.MockBatchedOperatorStats(2);
    +
    +    DefaultPartition<AbstractJdbcPollInputOperator<Object>> apartition = 
new DefaultPartition<AbstractJdbcPollInputOperator<Object>>(
    +        inputOperator);
    +
    +    TestUtils.MockPartition<AbstractJdbcPollInputOperator<Object>> 
pseudoParttion = new TestUtils.MockPartition<>(
    +        apartition, readerStats);
    +
    +    List<Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>> 
newMocks = Lists.newArrayList();
    +
    +    newMocks.add(pseudoParttion);
    +
    +    
Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>>
 newPartitions = inputOperator
    +        .definePartitions(newMocks, null);
    +
    +    
Iterator<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>>
 itr = newPartitions
    +        .iterator();
    +
    +    int operatorId = 0;
    +    for 
(com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>
 partition : newPartitions) {
    +
    +      Attribute.AttributeMap.DefaultAttributeMap partitionAttributeMap = 
new Attribute.AttributeMap.DefaultAttributeMap();
    +      this.dir = "target/" + APP_ID + "/";
    +      partitionAttributeMap.put(DAG.APPLICATION_ID, APP_ID);
    +      partitionAttributeMap.put(Context.DAGContext.APPLICATION_PATH, dir);
    +
    +      OperatorContextTestHelper.TestIdOperatorContext partitioningContext 
= new OperatorContextTestHelper.TestIdOperatorContext(
    +          operatorId++, partitionAttributeMap);
    +
    +      partition.getPartitionedInstance().setup(partitioningContext);
    +      partition.getPartitionedInstance().activate(partitioningContext);
    +    }
    +
    +    //First partition is for range queries,last is for polling queries
    +    AbstractJdbcPollInputOperator<Object> newInstance = 
itr.next().getPartitionedInstance();
    +    CollectorTestSink<Object> sink1 = new CollectorTestSink<>();
    +    newInstance.outputPort.setSink(sink1);
    +    newInstance.beginWindow(1);
    +    Thread.sleep(50);
    +    newInstance.emitTuples();
    +    newInstance.endWindow();
    +
    +    Assert.assertEquals("rows from db", 10, sink1.collectedTuples.size());
    +    int i = 0;
    +    for (Object tuple : sink1.collectedTuples) {
    +      String[] pojoEvent = tuple.toString().split(",");
    +      Assert.assertTrue("i=" + i, Integer.parseInt(pojoEvent[0]) == i ? 
true : false);
    +      i++;
    +    }
    +    sink1.collectedTuples.clear();
    +
    +    insertEventsInTable(10, 10);
    +
    +    AbstractJdbcPollInputOperator<Object> pollableInstance = 
itr.next().getPartitionedInstance();
    +
    +    pollableInstance.outputPort.setSink(sink1);
    +
    +    pollableInstance.beginWindow(1);
    +    Thread.sleep(700);
    +    pollableInstance.endWindow();
    +    pollableInstance.emitTuples();
    --- End diff --
    
    This is emitting outside the window. Needs to be between begin and end 
window.


> Add jdbc poller input operator
> ------------------------------
>
>                 Key: APEXMALHAR-2066
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: Ashwin Chandra Putta
>            Assignee: devendra tagare
>
> Create a JDBC poller input operator that has the following features.
> 1. poll from external jdbc store asynchronously in the input operator.
> 2. polling frequency and batch size should be configurable.
> 3. should be idempotent.
> 4. should be partition-able.
> 5. should be batch + polling capable.
> Assumptions for idempotency & partitioning,
> 1.User needs to provide tableName,dbConnection,setEmitColumnList,look-up key.
> 2.Optionally batchSize,pollInterval,Look-up key and a where clause can be 
> given.
> 3.This operator uses static partitioning to arrive at range queries for 
> exactly once reads
> 4.Assumption is that there is an ordered column using which range queries can 
> be formed<br>
> 5.If an emitColumnList is provided, please ensure that the keyColumn is the 
> first column in the list
> 6.Range queries are formed using the JdbcMetaDataUtility Output - comma 
> separated list of the emit columns eg columnA,columnB,columnC
> Per window the first and the last key processed is saved using the 
> FSWindowDataManager - (<lowerBound,UpperBound>,operatorId,windowId).This 
> (lowerBound,upperBoundPair) is then used for recovery.The queries are 
> constructed using the JDBCMetaDataUtility.
> JDBCMetaDataUtility
> A utility class used to retrieve the metadata for a given unique key of a SQL 
> table. This class would emit range queries based on a primary index given.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to