Repository: nifi Updated Branches: refs/heads/master 04db806ac -> ad7808f63
NIFI-2641: This closes #928. Add max values as attributes to QueryDatabaseTable Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ad7808f6 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ad7808f6 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ad7808f6 Branch: refs/heads/master Commit: ad7808f63dd86632b89463e72bd1ad0be48e9b59 Parents: 04db806 Author: Matt Burgess <[email protected]> Authored: Wed Aug 24 09:38:38 2016 -0400 Committer: joewitt <[email protected]> Committed: Thu Aug 25 14:13:55 2016 -0400 ---------------------------------------------------------------------- .../nifi/processors/standard/QueryDatabaseTable.java | 9 ++++++++- .../processors/standard/QueryDatabaseTableTest.java | 13 ++++++++++--- 2 files changed, 18 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/ad7808f6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index 3b68e29..d7f4b24 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -89,7 +89,9 @@ import java.util.concurrent.atomic.AtomicLong; @WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of " + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " - + "FlowFiles were produced")}) + + "FlowFiles were produced"), + @WritesAttribute(attribute = "maxvalue.*", description = "Each attribute contains the observed maximum value of a specified 'Maximum-value Column'. The " + + "suffix of the attribute is the name of the column")}) @DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial " + "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.") public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { @@ -258,6 +260,11 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { fileToProcess = session.putAttribute(fileToProcess, "fragment.index", String.valueOf(fragmentIndex)); } + // Add maximum values as attributes + for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) { + fileToProcess = session.putAttribute(fileToProcess, "maxvalue." + entry.getKey(), entry.getValue()); + } + logger.info("{} contains {} Avro records; transferring to 'success'", new Object[]{fileToProcess, nrOfRows.get()}); http://git-wip-us.apache.org/repos/asf/nifi/blob/ad7808f6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index 40fba54..e97e529 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -182,7 +182,9 @@ public class QueryDatabaseTableTest { runner.run(); runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); - InputStream in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0); + assertEquals(flowFile.getAttribute("maxvalue.id"), "2"); + InputStream in = new ByteArrayInputStream(flowFile.toByteArray()); runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2"); assertEquals(3, getNumberOfRecordsFromStream(in)); runner.clearTransferState(); @@ -196,7 +198,9 @@ public class QueryDatabaseTableTest { stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')"); runner.run(); runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); - in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0); + assertEquals(flowFile.getAttribute("maxvalue.id"), "3"); + in = new ByteArrayInputStream(flowFile.toByteArray()); assertEquals(1, getNumberOfRecordsFromStream(in)); // Sanity check - run again, this time no flowfiles/rows should be transferred @@ -212,7 +216,10 @@ public class QueryDatabaseTableTest { stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (4, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')"); runner.run(); runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); - in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0); + assertEquals(flowFile.getAttribute("maxvalue.id"), "4"); + assertEquals(flowFile.getAttribute("maxvalue.created_on"), "2011-01-01 03:23:34.234"); + in = new ByteArrayInputStream(flowFile.toByteArray()); assertEquals(1, getNumberOfRecordsFromStream(in)); runner.clearTransferState();
