Repository: nifi
Updated Branches:
  refs/heads/master b0e5e1644 -> d3f54994a


NIFI-4972 - SelectHiveQL to emit FETCH provenance event

SelectHiveQL should emit FETCH instead of CONTENT_MODIFIED when it has
incoming connections.

Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>

This closes #2543.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d3f54994
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d3f54994
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d3f54994

Branch: refs/heads/master
Commit: d3f54994a6510b5a428f30e65b3d456225749937
Parents: b0e5e16
Author: Koji Kawamura <ijokaruma...@apache.org>
Authored: Wed Mar 14 17:42:10 2018 +0900
Committer: Pierre Villard <pierre.villard...@gmail.com>
Committed: Wed Mar 14 10:17:19 2018 +0100

----------------------------------------------------------------------
 .../additionalDetails.html                         |  2 +-
 .../apache/nifi/processors/hive/SelectHiveQL.java  |  7 +++----
 .../nifi/processors/hive/TestSelectHiveQL.java     | 17 +++++++++++++++++
 3 files changed, 21 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d3f54994/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
index 38ad684..91c3979 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
@@ -392,7 +392,7 @@ Processor 3</pre>
                 </td>
                 <td>
                     SEND<br/>
-                    RECEIVE<br/>
+                    RECEIVE, FETCH<br/>
                 </td>
                 <td>jdbc:hive2://hive.example.com:10000/default</td>
                 <td>hive_table</td>

http://git-wip-us.apache.org/repos/asf/nifi/blob/d3f54994/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
index c15a9e1..832636b 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
@@ -410,10 +410,9 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
                                 new Object[]{flowfile, nrOfRows.get()});
 
                         if (context.hasIncomingConnection()) {
-                            // If the flow file came from an incoming 
connection, issue a Modify Content provenance event
-
-                            
session.getProvenanceReporter().modifyContent(flowfile, "Retrieved " + 
nrOfRows.get() + " rows",
-                                    
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+                            // If the flow file came from an incoming 
connection, issue a Fetch provenance event
+                            session.getProvenanceReporter().fetch(flowfile, 
dbcpService.getConnectionURL(),
+                                    "Retrieved " + nrOfRows.get() + " rows", 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
                         } else {
                             // If we created a flow file from rows received 
from Hive, issue a Receive provenance event
                             session.getProvenanceReporter().receive(flowfile, 
dbcpService.getConnectionURL(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));

http://git-wip-us.apache.org/repos/asf/nifi/blob/d3f54994/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java
index 3c3b7f9..bb919d8 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java
@@ -25,6 +25,8 @@ import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.dbcp.hive.HiveDBCPService;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -118,11 +120,26 @@ public class TestSelectHiveQL {
     public void testNoIncomingConnection() throws ClassNotFoundException, 
SQLException, InitializationException, IOException {
         runner.setIncomingConnection(false);
         invokeOnTrigger(QUERY_WITHOUT_EL, false, "Avro");
+
+        final List<ProvenanceEventRecord> provenanceEvents = 
runner.getProvenanceEvents();
+        final ProvenanceEventRecord provenance0 = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.RECEIVE, provenance0.getEventType());
+        assertEquals("jdbc:derby:target/db;create=true", 
provenance0.getTransitUri());
     }
 
     @Test
     public void testNoTimeLimit() throws InitializationException, 
ClassNotFoundException, SQLException, IOException {
         invokeOnTrigger(QUERY_WITH_EL, true, "Avro");
+
+        final List<ProvenanceEventRecord> provenanceEvents = 
runner.getProvenanceEvents();
+        assertEquals(2, provenanceEvents.size());
+
+        final ProvenanceEventRecord provenance0 = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.FETCH, provenance0.getEventType());
+        assertEquals("jdbc:derby:target/db;create=true", 
provenance0.getTransitUri());
+
+        final ProvenanceEventRecord provenance1 = provenanceEvents.get(1);
+        assertEquals(ProvenanceEventType.FORK, provenance1.getEventType());
     }
 
 

Reply via email to