Repository: nifi
Updated Branches:
  refs/heads/master 8ffa1703b -> 9cfc13423


NIFI-3455 Large row count paging

This closes #1499.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9cfc1342
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9cfc1342
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9cfc1342

Branch: refs/heads/master
Commit: 9cfc13423dd1a18dd4f4554eb0fb0d5b300c3e6e
Parents: 8ffa170
Author: patricker <[email protected]>
Authored: Thu Feb 9 10:45:01 2017 -0700
Committer: Pierre Villard <[email protected]>
Committed: Tue Feb 14 19:09:12 2017 +0100

----------------------------------------------------------------------
 .../processors/standard/GenerateTableFetch.java | 14 +++---
 .../processors/standard/db/DatabaseAdapter.java |  2 +-
 .../db/impl/GenericDatabaseAdapter.java         |  2 +-
 .../standard/db/impl/OracleDatabaseAdapter.java |  4 +-
 .../standard/TestGenerateTableFetch.java        | 53 +++++++++++++++++++-
 .../standard/db/impl/DerbyDatabaseAdapter.java  |  2 +-
 6 files changed, 64 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9cfc1342/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index 966c20d..48ec403 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -231,10 +231,10 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
 
             // Build a SELECT query with maximum-value columns (if present)
             final String selectQuery = dbAdapter.getSelectStatement(tableName, 
columnsClause, whereClause, null, null, null);
-            int rowCount = 0;
+            long rowCount = 0;
 
             try (final Connection con = dbcpService.getConnection();
-                 final Statement st = con.createStatement()) {
+                final Statement st = con.createStatement()) {
 
                 final Integer queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue();
                 st.setQueryTimeout(queryTimeout); // timeout in seconds
@@ -246,7 +246,7 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
 
                 if (resultSet.next()) {
                     // Total row count is in the first column
-                    rowCount = resultSet.getInt(1);
+                    rowCount = resultSet.getLong(1);
 
                     // Update the state map with the newly-observed maximum 
values
                     ResultSetMetaData rsmd = resultSet.getMetaData();
@@ -282,12 +282,12 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
                     throw new SQLException("No rows returned from metadata 
query: " + selectQuery);
                 }
 
-                final int numberOfFetches = (partitionSize == 0) ? rowCount : 
(rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
+                final long numberOfFetches = (partitionSize == 0) ? rowCount : 
(rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
 
                 // Generate SQL statements to read "pages" of data
-                for (int i = 0; i < numberOfFetches; i++) {
-                    Integer limit = partitionSize == 0 ? null : partitionSize;
-                    Integer offset = partitionSize == 0 ? null : i * 
partitionSize;
+                for (long i = 0; i < numberOfFetches; i++) {
+                    long limit = partitionSize == 0 ? null : partitionSize;
+                    long offset = partitionSize == 0 ? null : i * 
partitionSize;
                     final String query = 
dbAdapter.getSelectStatement(tableName, columnNames, whereClause, 
StringUtils.join(maxValueColumnNameList, ", "), limit, offset);
                     FlowFile sqlFlowFile = (fileToProcess == null) ? 
session.create() : session.create(fileToProcess);
                     sqlFlowFile = session.write(sqlFlowFile, out -> 
out.write(query.getBytes()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/9cfc1342/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
index 21ab331..b7f3e72 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
@@ -34,5 +34,5 @@ public interface DatabaseAdapter {
      * @param offset        The value for the OFFSET clause (i.e. the number 
of rows to skip)
      * @return A String containing a SQL SELECT statement with the given 
clauses applied
      */
-    String getSelectStatement(String tableName, String columnNames, String 
whereClause, String orderByClause, Integer limit, Integer offset);
+    String getSelectStatement(String tableName, String columnNames, String 
whereClause, String orderByClause, Long limit, Long offset);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9cfc1342/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java
index c48d2cd..ae3af7a 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java
@@ -29,7 +29,7 @@ public class GenericDatabaseAdapter implements 
DatabaseAdapter {
     }
 
     @Override
-    public String getSelectStatement(String tableName, String columnNames, 
String whereClause, String orderByClause, Integer limit, Integer offset) {
+    public String getSelectStatement(String tableName, String columnNames, 
String whereClause, String orderByClause, Long limit, Long offset) {
         if (StringUtils.isEmpty(tableName)) {
             throw new IllegalArgumentException("Table name cannot be null or 
empty");
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9cfc1342/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
index d918400..9338343 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
@@ -29,7 +29,7 @@ public class OracleDatabaseAdapter implements DatabaseAdapter 
{
     }
 
     @Override
-    public String getSelectStatement(String tableName, String columnNames, 
String whereClause, String orderByClause, Integer limit, Integer offset) {
+    public String getSelectStatement(String tableName, String columnNames, 
String whereClause, String orderByClause, Long limit, Long offset) {
         if (StringUtils.isEmpty(tableName)) {
             throw new IllegalArgumentException("Table name cannot be null or 
empty");
         }
@@ -66,7 +66,7 @@ public class OracleDatabaseAdapter implements DatabaseAdapter 
{
         }
         if (nestedSelect) {
             query.append(") a");
-            int offsetVal = 0;
+            long offsetVal = 0;
             if (offset != null) {
                 offsetVal = offset;
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9cfc1342/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index f79f96c..79093b2 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -38,9 +38,11 @@ import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.SQLNonTransientConnectionException;
 import java.sql.Statement;
+import java.sql.Types;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -49,6 +51,11 @@ import static 
org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 
 /**
@@ -58,6 +65,7 @@ public class TestGenerateTableFetch {
 
     TestRunner runner;
     GenerateTableFetch processor;
+    DBCPServiceSimpleImpl dbcp;
 
     private final static String DB_LOCATION = "target/db_gtf";
 
@@ -93,7 +101,9 @@ public class TestGenerateTableFetch {
     @Before
     public void setUp() throws Exception {
         processor = new GenerateTableFetch();
-        final DBCPService dbcp = new DBCPServiceSimpleImpl();
+        //Mock the DBCP Controller Service so we can control the Results
+        dbcp = spy(new DBCPServiceSimpleImpl());
+
         final Map<String, String> dbcpProperties = new HashMap<>();
 
         runner = TestRunners.newTestRunner(processor);
@@ -514,6 +524,47 @@ public class TestGenerateTableFetch {
         assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 
10000 ROWS ONLY", new String(flowFile.toByteArray()));
     }
 
+    @Test
+    public void testRidiculousRowCount() throws ClassNotFoundException, 
SQLException, InitializationException, IOException {
+        long rowCount= Long.parseLong(Integer.toString(Integer.MAX_VALUE)) + 
100;
+        int partitionSize = 1000000;
+        int expectedFileCount = (int)(rowCount/partitionSize) + 1;
+
+        Connection conn = mock(Connection.class);
+        when(dbcp.getConnection()).thenReturn(conn);
+        Statement st = mock(Statement.class);
+        when(conn.createStatement()).thenReturn(st);
+        doNothing().when(st).close();
+        ResultSet rs = mock(ResultSet.class);
+        when(st.executeQuery(anyString())).thenReturn(rs);
+        when(rs.next()).thenReturn(true);
+        when(rs.getInt(1)).thenReturn((int)rowCount);
+        when(rs.getLong(1)).thenReturn(rowCount);
+
+        final ResultSetMetaData resultSetMetaData = 
mock(ResultSetMetaData.class);
+        when(rs.getMetaData()).thenReturn(resultSetMetaData);
+        when(resultSetMetaData.getColumnCount()).thenReturn(2);
+        when(resultSetMetaData.getTableName(1)).thenReturn("");
+        when(resultSetMetaData.getColumnType(1)).thenReturn(Types.INTEGER);
+        when(resultSetMetaData.getColumnName(1)).thenReturn("COUNT");
+        when(resultSetMetaData.getColumnType(2)).thenReturn(Types.INTEGER);
+        when(resultSetMetaData.getColumnName(2)).thenReturn("ID");
+        when(rs.getInt(2)).thenReturn(1000);
+
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, 
"TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
+        runner.setProperty(GenerateTableFetch.PARTITION_SIZE, 
Integer.toString(partitionSize));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, expectedFileCount);
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        String query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY ID FETCH NEXT 
1000000 ROWS ONLY", query);
+        runner.clearTransferState();
+    }
+
 
     /**
      * Simple implementation only for GenerateTableFetch processor testing.

http://git-wip-us.apache.org/repos/asf/nifi/blob/9cfc1342/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java
index 7d6ab89..66a473d 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java
@@ -30,7 +30,7 @@ public class DerbyDatabaseAdapter implements DatabaseAdapter {
     }
 
     @Override
-    public String getSelectStatement(String tableName, String columnNames, 
String whereClause, String orderByClause, Integer limit, Integer offset) {
+    public String getSelectStatement(String tableName, String columnNames, 
String whereClause, String orderByClause, Long limit, Long offset) {
         if (StringUtils.isEmpty(tableName)) {
             throw new IllegalArgumentException("Table name cannot be null or 
empty");
         }

Reply via email to