Repository: nifi Updated Branches: refs/heads/master b0e5e1644 -> d3f54994a
NIFI-4972 - SelectHiveQL to emit FETCH provenance event SelectHiveQL should emit FETCH instead of CONTENT_MODIFIED when it has incoming connections. Signed-off-by: Pierre Villard <pierre.villard...@gmail.com> This closes #2543. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d3f54994 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d3f54994 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d3f54994 Branch: refs/heads/master Commit: d3f54994a6510b5a428f30e65b3d456225749937 Parents: b0e5e16 Author: Koji Kawamura <ijokaruma...@apache.org> Authored: Wed Mar 14 17:42:10 2018 +0900 Committer: Pierre Villard <pierre.villard...@gmail.com> Committed: Wed Mar 14 10:17:19 2018 +0100 ---------------------------------------------------------------------- .../additionalDetails.html | 2 +- .../apache/nifi/processors/hive/SelectHiveQL.java | 7 +++---- .../nifi/processors/hive/TestSelectHiveQL.java | 17 +++++++++++++++++ 3 files changed, 21 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/d3f54994/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html index 38ad684..91c3979 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html @@ -392,7 +392,7 @@ Processor 3</pre> </td> <td> SEND<br/> - RECEIVE<br/> + RECEIVE, FETCH<br/> </td> <td>jdbc:hive2://hive.example.com:10000/default</td> <td>hive_table</td> http://git-wip-us.apache.org/repos/asf/nifi/blob/d3f54994/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java index c15a9e1..832636b 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java @@ -410,10 +410,9 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { new Object[]{flowfile, nrOfRows.get()}); if (context.hasIncomingConnection()) { - // If the flow file came from an incoming connection, issue a Modify Content provenance event - - session.getProvenanceReporter().modifyContent(flowfile, "Retrieved " + nrOfRows.get() + " rows", - stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + // If the flow file came from an incoming connection, issue a Fetch provenance event + session.getProvenanceReporter().fetch(flowfile, dbcpService.getConnectionURL(), + "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS)); } else { // If we created a flow file from rows received from Hive, issue a Receive provenance event session.getProvenanceReporter().receive(flowfile, dbcpService.getConnectionURL(), stopWatch.getElapsed(TimeUnit.MILLISECONDS)); http://git-wip-us.apache.org/repos/asf/nifi/blob/d3f54994/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java index 3c3b7f9..bb919d8 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java @@ -25,6 +25,8 @@ import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.dbcp.hive.HiveDBCPService; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -118,11 +120,26 @@ public class TestSelectHiveQL { public void testNoIncomingConnection() throws ClassNotFoundException, SQLException, InitializationException, IOException { runner.setIncomingConnection(false); invokeOnTrigger(QUERY_WITHOUT_EL, false, "Avro"); + + final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents(); + final ProvenanceEventRecord provenance0 = provenanceEvents.get(0); + assertEquals(ProvenanceEventType.RECEIVE, provenance0.getEventType()); + assertEquals("jdbc:derby:target/db;create=true", provenance0.getTransitUri()); } @Test public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException { invokeOnTrigger(QUERY_WITH_EL, true, "Avro"); + + final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents(); + assertEquals(2, provenanceEvents.size()); + + final ProvenanceEventRecord provenance0 = provenanceEvents.get(0); + assertEquals(ProvenanceEventType.FETCH, provenance0.getEventType()); + assertEquals("jdbc:derby:target/db;create=true", provenance0.getTransitUri()); + + final ProvenanceEventRecord provenance1 = provenanceEvents.get(1); + assertEquals(ProvenanceEventType.FORK, provenance1.getEventType()); }