Repository: nifi
Updated Branches:
  refs/heads/master 2ef7c15b5 -> 1e4369414


NIFI-3057 Added provenance events to PutElasticsearch and FetchElasticsearch

This closes: #1370.

Signed-off-by: Andre F de Miranda <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1e436941
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1e436941
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1e436941

Branch: refs/heads/master
Commit: 1e4369414568f61172f99ec9676ac0c7568934ee
Parents: 2ef7c15
Author: Pierre Villard <[email protected]>
Authored: Thu Dec 29 17:42:24 2016 +0100
Committer: Andre F de Miranda <[email protected]>
Committed: Wed Feb 15 01:37:14 2017 +1100

----------------------------------------------------------------------
 .../nifi/processors/elasticsearch/FetchElasticsearch.java   | 9 +++++++++
 .../nifi/processors/elasticsearch/PutElasticsearch.java     | 8 +++++++-
 .../processors/elasticsearch/TestFetchElasticsearch.java    | 5 +++++
 .../nifi/processors/elasticsearch/TestPutElasticsearch.java | 5 +++++
 4 files changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/1e436941/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java
index 643edbb..84f31b7 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java
@@ -49,6 +49,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@@ -143,6 +144,7 @@ public class FetchElasticsearch extends 
AbstractElasticsearchTransportClientProc
         return propertyDescriptors;
     }
 
+    @Override
     @OnScheduled
     public void setup(ProcessContext context) {
         super.setup(context);
@@ -165,6 +167,8 @@ public class FetchElasticsearch extends 
AbstractElasticsearchTransportClientProc
         try {
 
             logger.debug("Fetching {}/{}/{} from Elasticsearch", new 
Object[]{index, docType, docId});
+            final long startNanos = System.nanoTime();
+
             GetRequestBuilder getRequestBuilder = 
esClient.get().prepareGet(index, docType, docId);
             if (authToken != null) {
                 getRequestBuilder.putHeader("Authorization", authToken);
@@ -189,6 +193,10 @@ public class FetchElasticsearch extends 
AbstractElasticsearchTransportClientProc
                     }
                 });
                 logger.debug("Elasticsearch document " + docId + " fetched, 
routing to success");
+
+                final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+                final String uri = 
context.getProperty(HOSTS).evaluateAttributeExpressions().getValue() + "/" + 
index + "/" + docType + "/" + docId;
+                session.getProvenanceReporter().fetch(flowFile, uri, millis);
                 session.transfer(flowFile, REL_SUCCESS);
             }
         } catch (NoNodeAvailableException
@@ -211,6 +219,7 @@ public class FetchElasticsearch extends 
AbstractElasticsearchTransportClientProc
     /**
      * Dispose of ElasticSearch client
      */
+    @Override
     @OnStopped
     public void closeClient() {
         super.closeClient();

http://git-wip-us.apache.org/repos/asf/nifi/blob/1e436941/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
index ab61f67..1f68c22 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
@@ -230,6 +230,7 @@ public class PutElasticsearch extends 
AbstractElasticsearchTransportClientProces
                             session.transfer(flowFile, REL_FAILURE);
 
                         } else {
+                            session.getProvenanceReporter().send(flowFile, 
context.getProperty(HOSTS).evaluateAttributeExpressions().getValue() + "/" + 
responses[i].getIndex());
                             session.transfer(flowFile, REL_SUCCESS);
                         }
                         flowFilesToTransfer.remove(flowFile);
@@ -238,7 +239,12 @@ public class PutElasticsearch extends 
AbstractElasticsearchTransportClientProces
             }
 
             // Transfer any remaining flowfiles to success
-            session.transfer(flowFilesToTransfer, REL_SUCCESS);
+            flowFilesToTransfer.forEach(file -> {
+                session.transfer(file, REL_SUCCESS);
+                // Record provenance event
+                session.getProvenanceReporter().send(file, 
context.getProperty(HOSTS).evaluateAttributeExpressions().getValue() + "/" +
+                                
context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue());
+            });
 
         } catch (NoNodeAvailableException
                 | ElasticsearchTimeoutException

http://git-wip-us.apache.org/repos/asf/nifi/blob/1e436941/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java
index ba22b65..30c42d0 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.elasticsearch;
 
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessContext;
@@ -49,6 +50,8 @@ import java.net.MalformedURLException;
 import java.util.HashMap;
 import java.util.concurrent.ExecutionException;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyString;
@@ -97,6 +100,8 @@ public class TestFetchElasticsearch {
         runner.run(1, true, true);
 
         runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_SUCCESS, 
1);
+        assertFalse(runner.getProvenanceEvents().isEmpty());
+        runner.getProvenanceEvents().forEach(event -> { 
assertEquals(event.getEventType(), ProvenanceEventType.FETCH); });
         final MockFlowFile out = 
runner.getFlowFilesForRelationship(FetchElasticsearch.REL_SUCCESS).get(0);
         assertNotNull(out);
         out.assertAttributeEquals("doc_id", "28039652140");

http://git-wip-us.apache.org/repos/asf/nifi/blob/1e436941/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
index 6d6da5a..6da6a29 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.elasticsearch;
 
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -50,6 +51,8 @@ import java.io.InputStream;
 import java.util.HashMap;
 import java.util.concurrent.ExecutionException;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -97,6 +100,8 @@ public class TestPutElasticsearch {
         runner.run(1, true, true);
 
         runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
+        assertFalse(runner.getProvenanceEvents().isEmpty());
+        runner.getProvenanceEvents().forEach(event -> { 
assertEquals(event.getEventType(), ProvenanceEventType.SEND); });
         final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
         assertNotNull(out);
         out.assertAttributeEquals("doc_id", "28039652140");

Reply via email to