Repository: nifi
Updated Branches:
  refs/heads/master 04db806ac -> ad7808f63


NIFI-2641: This closes #928. Add max values as attributes to QueryDatabaseTable


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ad7808f6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ad7808f6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ad7808f6

Branch: refs/heads/master
Commit: ad7808f63dd86632b89463e72bd1ad0be48e9b59
Parents: 04db806
Author: Matt Burgess <[email protected]>
Authored: Wed Aug 24 09:38:38 2016 -0400
Committer: joewitt <[email protected]>
Committed: Thu Aug 25 14:13:55 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/QueryDatabaseTable.java   |  9 ++++++++-
 .../processors/standard/QueryDatabaseTableTest.java    | 13 ++++++++++---
 2 files changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ad7808f6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 3b68e29..d7f4b24 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -89,7 +89,9 @@ import java.util.concurrent.atomic.AtomicLong;
         @WritesAttribute(attribute="fragment.index", description="If 'Max Rows 
Per Flow File' is set then the position of this FlowFile in the list of "
                 + "outgoing FlowFiles that were all derived from the same 
result set FlowFile. This can be "
                 + "used in conjunction with the fragment.identifier attribute 
to know which FlowFiles originated from the same query result set and in what 
order  "
-                + "FlowFiles were produced")})
+                + "FlowFiles were produced"),
+        @WritesAttribute(attribute = "maxvalue.*", description = "Each 
attribute contains the observed maximum value of a specified 'Maximum-value 
Column'. The "
+                + "suffix of the attribute is the name of the column")})
 @DynamicProperty(name = "Initial Max Value", value = "Attribute Expression 
Language", supportsExpressionLanguage = false, description = "Specifies an 
initial "
         + "max value for max value columns. Properties should be added in the 
format `initial.maxvalue.{max_value_column}`.")
 public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
@@ -258,6 +260,11 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
                             fileToProcess = 
session.putAttribute(fileToProcess, "fragment.index", 
String.valueOf(fragmentIndex));
                         }
 
+                        // Add maximum values as attributes
+                        for (Map.Entry<String, String> entry : 
statePropertyMap.entrySet()) {
+                            fileToProcess = 
session.putAttribute(fileToProcess, "maxvalue." + entry.getKey(), 
entry.getValue());
+                        }
+
                         logger.info("{} contains {} Avro records; transferring 
to 'success'",
                                 new Object[]{fileToProcess, nrOfRows.get()});
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/ad7808f6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index 40fba54..e97e529 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -182,7 +182,9 @@ public class QueryDatabaseTableTest {
         runner.run();
         runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
 
-        InputStream in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+        InputStream in = new ByteArrayInputStream(flowFile.toByteArray());
         runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");
         assertEquals(3, getNumberOfRecordsFromStream(in));
         runner.clearTransferState();
@@ -196,7 +198,9 @@ public class QueryDatabaseTableTest {
         stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
         runner.run();
         runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
-        in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        flowFile = 
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "3");
+        in = new ByteArrayInputStream(flowFile.toByteArray());
         assertEquals(1, getNumberOfRecordsFromStream(in));
 
         // Sanity check - run again, this time no flowfiles/rows should be 
transferred
@@ -212,7 +216,10 @@ public class QueryDatabaseTableTest {
         stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (4, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
         runner.run();
         runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
-        in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        flowFile = 
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "4");
+        assertEquals(flowFile.getAttribute("maxvalue.created_on"), "2011-01-01 
03:23:34.234");
+        in = new ByteArrayInputStream(flowFile.toByteArray());
         assertEquals(1, getNumberOfRecordsFromStream(in));
         runner.clearTransferState();
 

Reply via email to