This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d18735  NIFI-6348: Added Custom ORDER BY Column property to 
GenerateTableFetch NIFI-6348: Fixed doc
1d18735 is described below

commit 1d18735076d134af2ec7c484c982746e261ed4a5
Author: Matthew Burgess <[email protected]>
AuthorDate: Tue Jun 4 15:50:41 2019 -0400

    NIFI-6348: Added Custom ORDER BY Column property to GenerateTableFetch
    NIFI-6348: Fixed doc
    
    This closes #3515.
    
    Signed-off-by: Bryan Bende <[email protected]>
---
 .../processors/standard/GenerateTableFetch.java    | 16 ++++++-
 .../standard/TestGenerateTableFetch.java           | 49 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 1 deletion(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index 4d6ccc8..fadf51c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -154,6 +154,18 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
+    static final PropertyDescriptor CUSTOM_ORDERBY_COLUMN = new 
PropertyDescriptor.Builder()
+            .name("gen-table-custom-orderby-column")
+            .displayName("Custom ORDER BY Column")
+            .description("The name of a column to be used for ordering the 
results if Max-Value Columns are not provided and partitioning is enabled. This 
property is ignored if either "
+                    + "Max-Value Columns is set or Partition Size = 0. NOTE: 
If neither Max-Value Columns nor Custom ORDER BY Column is set, then depending 
on the "
+                    + "the database/driver, the processor may report an error 
and/or the generated SQL may result in missing and/or duplicate rows. This is 
because without an explicit "
+                    + "ordering, fetching each partition is done using an 
arbitrary ordering.")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
     public static final Relationship REL_FAILURE = new Relationship.Builder()
             .name("failure")
             .description("This relationship is only used when SQL query 
execution (using an incoming FlowFile) failed. The incoming FlowFile will be 
penalized and routed to this relationship. "
@@ -176,6 +188,7 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
         pds.add(PARTITION_SIZE);
         pds.add(COLUMN_FOR_VALUE_PARTITIONING);
         pds.add(WHERE_CLAUSE);
+        pds.add(CUSTOM_ORDERBY_COLUMN);
         pds.add(OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS);
         propDescriptors = Collections.unmodifiableList(pds);
     }
@@ -260,6 +273,7 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
         final String columnForPartitioning = 
context.getProperty(COLUMN_FOR_VALUE_PARTITIONING).evaluateAttributeExpressions(fileToProcess).getValue();
         final boolean useColumnValsForPaging = 
!StringUtils.isEmpty(columnForPartitioning);
         final String customWhereClause = 
context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions(fileToProcess).getValue();
+        final String customOrderByColumn = 
context.getProperty(CUSTOM_ORDERBY_COLUMN).evaluateAttributeExpressions(fileToProcess).getValue();
         final boolean outputEmptyFlowFileOnZeroResults = 
context.getProperty(OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS).asBoolean();
 
         final StateManager stateManager = context.getStateManager();
@@ -497,7 +511,7 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
                         whereClause = maxValueClauses.isEmpty() ? "1=1" : 
StringUtils.join(maxValueClauses, " AND ");
                         Long offset = partitionSize == 0 ? null : i * 
partitionSize + (useColumnValsForPaging ? minValueForPartitioning : 0);
                         // Don't use an ORDER BY clause if there's only one 
partition
-                        final String orderByClause = partitionSize == 0 ? null 
: maxColumnNames;
+                        final String orderByClause = partitionSize == 0 ? null 
: (maxColumnNames.isEmpty() ? customOrderByColumn : maxColumnNames);
 
                         final String query = 
dbAdapter.getSelectStatement(tableName, columnNames, whereClause, 
orderByClause, limit, offset, columnForPartitioning);
                         FlowFile sqlFlowFile = (fileToProcess == null) ? 
session.create() : session.create(fileToProcess);
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index 25a5918..89fda05 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -1609,6 +1609,55 @@ public class TestGenerateTableFetch {
         runner.clearTransferState();
     }
 
+    @Test
+    public void testCustomOrderByColumn() throws SQLException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
name varchar(100), scale float, created_on timestamp, bignum bigint default 
0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, 
"TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(GenerateTableFetch.CUSTOM_ORDERBY_COLUMN, "SCALE");
+        runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "2");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        String query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE 1=1 ORDER BY 
SCALE FETCH NEXT 2 ROWS ONLY", query);
+        flowFile.assertAttributeEquals(FRAGMENT_INDEX, "0");
+        flowFile.assertAttributeEquals(FRAGMENT_COUNT, "2");
+        ResultSet resultSet = stmt.executeQuery(query);
+        // Should be two records
+        assertTrue(resultSet.next());
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
+        query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE 1=1 ORDER BY 
SCALE OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", query);
+        flowFile.assertAttributeEquals(FRAGMENT_INDEX, "1");
+        flowFile.assertAttributeEquals(FRAGMENT_COUNT, "2");
+        resultSet = stmt.executeQuery(query);
+        // Should be one record
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+
+        runner.clearTransferState();
+    }
+
     /**
      * Simple implementation only for GenerateTableFetch processor testing.
      */

Reply via email to