This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 1d18735 NIFI-6348: Added Custom ORDER BY Column property to
GenerateTableFetch NIFI-6348: Fixed doc
1d18735 is described below
commit 1d18735076d134af2ec7c484c982746e261ed4a5
Author: Matthew Burgess <[email protected]>
AuthorDate: Tue Jun 4 15:50:41 2019 -0400
NIFI-6348: Added Custom ORDER BY Column property to GenerateTableFetch
NIFI-6348: Fixed doc
This closes #3515.
Signed-off-by: Bryan Bende <[email protected]>
---
.../processors/standard/GenerateTableFetch.java | 16 ++++++-
.../standard/TestGenerateTableFetch.java | 49 ++++++++++++++++++++++
2 files changed, 64 insertions(+), 1 deletion(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index 4d6ccc8..fadf51c 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -154,6 +154,18 @@ public class GenerateTableFetch extends
AbstractDatabaseFetchProcessor {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
+ static final PropertyDescriptor CUSTOM_ORDERBY_COLUMN = new
PropertyDescriptor.Builder()
+ .name("gen-table-custom-orderby-column")
+ .displayName("Custom ORDER BY Column")
+ .description("The name of a column to be used for ordering the
results if Max-Value Columns are not provided and partitioning is enabled. This
property is ignored if either "
+ + "Max-Value Columns is set or Partition Size = 0. NOTE:
If neither Max-Value Columns nor Custom ORDER BY Column is set, then depending
on the "
+ + "the database/driver, the processor may report an error
and/or the generated SQL may result in missing and/or duplicate rows. This is
because without an explicit "
+ + "ordering, fetching each partition is done using an
arbitrary ordering.")
+ .required(false)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("This relationship is only used when SQL query
execution (using an incoming FlowFile) failed. The incoming FlowFile will be
penalized and routed to this relationship. "
@@ -176,6 +188,7 @@ public class GenerateTableFetch extends
AbstractDatabaseFetchProcessor {
pds.add(PARTITION_SIZE);
pds.add(COLUMN_FOR_VALUE_PARTITIONING);
pds.add(WHERE_CLAUSE);
+ pds.add(CUSTOM_ORDERBY_COLUMN);
pds.add(OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS);
propDescriptors = Collections.unmodifiableList(pds);
}
@@ -260,6 +273,7 @@ public class GenerateTableFetch extends
AbstractDatabaseFetchProcessor {
final String columnForPartitioning =
context.getProperty(COLUMN_FOR_VALUE_PARTITIONING).evaluateAttributeExpressions(fileToProcess).getValue();
final boolean useColumnValsForPaging =
!StringUtils.isEmpty(columnForPartitioning);
final String customWhereClause =
context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions(fileToProcess).getValue();
+ final String customOrderByColumn =
context.getProperty(CUSTOM_ORDERBY_COLUMN).evaluateAttributeExpressions(fileToProcess).getValue();
final boolean outputEmptyFlowFileOnZeroResults =
context.getProperty(OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS).asBoolean();
final StateManager stateManager = context.getStateManager();
@@ -497,7 +511,7 @@ public class GenerateTableFetch extends
AbstractDatabaseFetchProcessor {
whereClause = maxValueClauses.isEmpty() ? "1=1" :
StringUtils.join(maxValueClauses, " AND ");
Long offset = partitionSize == 0 ? null : i *
partitionSize + (useColumnValsForPaging ? minValueForPartitioning : 0);
// Don't use an ORDER BY clause if there's only one
partition
- final String orderByClause = partitionSize == 0 ? null
: maxColumnNames;
+ final String orderByClause = partitionSize == 0 ? null
: (maxColumnNames.isEmpty() ? customOrderByColumn : maxColumnNames);
final String query =
dbAdapter.getSelectStatement(tableName, columnNames, whereClause,
orderByClause, limit, offset, columnForPartitioning);
FlowFile sqlFlowFile = (fileToProcess == null) ?
session.create() : session.create(fileToProcess);
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index 25a5918..89fda05 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -1609,6 +1609,55 @@ public class TestGenerateTableFetch {
runner.clearTransferState();
}
+ @Test
+ public void testCustomOrderByColumn() throws SQLException, IOException {
+
+ // load test data to database
+ final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null,
name varchar(100), scale float, created_on timestamp, bignum bigint default
0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale,
created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+ runner.setProperty(GenerateTableFetch.TABLE_NAME,
"TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(GenerateTableFetch.CUSTOM_ORDERBY_COLUMN, "SCALE");
+ runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "2");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+ String query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE 1=1 ORDER BY
SCALE FETCH NEXT 2 ROWS ONLY", query);
+ flowFile.assertAttributeEquals(FRAGMENT_INDEX, "0");
+ flowFile.assertAttributeEquals(FRAGMENT_COUNT, "2");
+ ResultSet resultSet = stmt.executeQuery(query);
+ // Should be two records
+ assertTrue(resultSet.next());
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
+ query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE 1=1 ORDER BY
SCALE OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", query);
+ flowFile.assertAttributeEquals(FRAGMENT_INDEX, "1");
+ flowFile.assertAttributeEquals(FRAGMENT_COUNT, "2");
+ resultSet = stmt.executeQuery(query);
+ // Should be one record
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+
+ runner.clearTransferState();
+ }
+
/**
* Simple implementation only for GenerateTableFetch processor testing.
*/