Repository: nifi
Updated Branches:
  refs/heads/master 993d3cd78 -> 9064b9763


NIFI-1816 Added provenance event SEND in HandleHttpResponse

This closes #389


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9064b976
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9064b976
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9064b976

Branch: refs/heads/master
Commit: 9064b976317e316f42ac279dd026105b54a17ddb
Parents: 993d3cd
Author: Pierre Villard <[email protected]>
Authored: Thu Apr 28 18:59:42 2016 +0200
Committer: jpercivall <[email protected]>
Committed: Sat May 7 11:51:26 2016 -0400

----------------------------------------------------------------------
 .../processors/standard/HandleHttpRequest.java  | 26 ++++++------
 .../processors/standard/HandleHttpResponse.java | 24 ++++++++---
 .../processors/standard/util/HTTPUtils.java     | 42 ++++++++++++++++++++
 .../standard/TestHandleHttpResponse.java        | 14 ++++++-
 4 files changed, 86 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9064b976/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
index b0e91b4..f3b065a 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
@@ -62,6 +62,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.HTTPUtils;
 import org.apache.nifi.ssl.SSLContextService;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.HttpConfiguration;
@@ -80,20 +81,22 @@ import com.sun.jersey.api.client.ClientResponse.Status;
 @CapabilityDescription("Starts an HTTP Server and listens for HTTP Requests. 
For each request, creates a FlowFile and transfers to 'success'. "
         + "This Processor is designed to be used in conjunction with the 
HandleHttpResponse Processor in order to create a Web Service")
 @WritesAttributes({
-    @WritesAttribute(attribute = "http.context.identifier", description = "An 
identifier that allows the HandleHttpRequest and HandleHttpResponse "
+    @WritesAttribute(attribute = HTTPUtils.HTTP_CONTEXT_ID, description = "An 
identifier that allows the HandleHttpRequest and HandleHttpResponse "
             + "to coordinate which FlowFile belongs to which HTTP 
Request/Response."),
     @WritesAttribute(attribute = "mime.type", description = "The MIME Type of 
the data, according to the HTTP Header \"Content-Type\""),
     @WritesAttribute(attribute = "http.servlet.path", description = "The part 
of the request URL that is considered the Servlet Path"),
     @WritesAttribute(attribute = "http.context.path", description = "The part 
of the request URL that is considered to be the Context Path"),
     @WritesAttribute(attribute = "http.method", description = "The HTTP Method 
that was used for the request, such as GET or POST"),
+    @WritesAttribute(attribute = HTTPUtils.HTTP_LOCAL_NAME, description = "IP 
address/hostname of the server"),
+    @WritesAttribute(attribute = HTTPUtils.HTTP_PORT, description = "Listening 
port of the server"),
     @WritesAttribute(attribute = "http.query.string", description = "The query 
string portion of hte Request URL"),
-    @WritesAttribute(attribute = "http.remote.host", description = "The 
hostname of the requestor"),
+    @WritesAttribute(attribute = HTTPUtils.HTTP_REMOTE_HOST, description = 
"The hostname of the requestor"),
     @WritesAttribute(attribute = "http.remote.addr", description = "The 
hostname:port combination of the requestor"),
     @WritesAttribute(attribute = "http.remote.user", description = "The 
username of the requestor"),
-    @WritesAttribute(attribute = "http.request.uri", description = "The full 
Request URL"),
+    @WritesAttribute(attribute = HTTPUtils.HTTP_REQUEST_URI, description = 
"The full Request URL"),
     @WritesAttribute(attribute = "http.auth.type", description = "The type of 
HTTP Authorization used"),
     @WritesAttribute(attribute = "http.principal.name", description = "The 
name of the authenticated user making the request"),
-    @WritesAttribute(attribute = "http.subject.dn", description = "The 
Distinguished Name of the requestor. This value will not be populated "
+    @WritesAttribute(attribute = HTTPUtils.HTTP_SSL_CERT, description = "The 
Distinguished Name of the requestor. This value will not be populated "
             + "unless the Processor is configured to use an SSLContext 
Service"),
     @WritesAttribute(attribute = "http.issuer.dn", description = "The 
Distinguished Name of the entity that issued the Subject's certificate. "
             + "This value will not be populated unless the Processor is 
configured to use an SSLContext Service"),
@@ -104,7 +107,6 @@ import com.sun.jersey.api.client.ClientResponse.Status;
         classNames = {"org.apache.nifi.http.StandardHttpContextMap", 
"org.apache.nifi.ssl.StandardSSLContextService"})
 public class HandleHttpRequest extends AbstractProcessor {
 
-    public static final String HTTP_CONTEXT_ID = "http.context.identifier";
     private static final Pattern URL_QUERY_PARAM_DELIMITER = 
Pattern.compile("&");
 
     // Allowable values for client auth
@@ -493,20 +495,20 @@ public class HandleHttpRequest extends AbstractProcessor {
         final String contextIdentifier = UUID.randomUUID().toString();
         final Map<String, String> attributes = new HashMap<>();
         try {
-            putAttribute(attributes, HTTP_CONTEXT_ID, contextIdentifier);
+            putAttribute(attributes, HTTPUtils.HTTP_CONTEXT_ID, 
contextIdentifier);
             putAttribute(attributes, "mime.type", request.getContentType());
             putAttribute(attributes, "http.servlet.path", 
request.getServletPath());
             putAttribute(attributes, "http.context.path", 
request.getContextPath());
             putAttribute(attributes, "http.method", request.getMethod());
             putAttribute(attributes, "http.local.addr", 
request.getLocalAddr());
-            putAttribute(attributes, "http.local.name", 
request.getLocalName());
+            putAttribute(attributes, HTTPUtils.HTTP_LOCAL_NAME, 
request.getLocalName());
             if (request.getQueryString() != null) {
                 putAttribute(attributes, "http.query.string", 
URLDecoder.decode(request.getQueryString(), charset));
             }
-            putAttribute(attributes, "http.remote.host", 
request.getRemoteHost());
+            putAttribute(attributes, HTTPUtils.HTTP_REMOTE_HOST, 
request.getRemoteHost());
             putAttribute(attributes, "http.remote.addr", 
request.getRemoteAddr());
             putAttribute(attributes, "http.remote.user", 
request.getRemoteUser());
-            putAttribute(attributes, "http.request.uri", 
request.getRequestURI());
+            putAttribute(attributes, HTTPUtils.HTTP_REQUEST_URI, 
request.getRequestURI());
             putAttribute(attributes, "http.request.url", 
request.getRequestURL().toString());
             putAttribute(attributes, "http.auth.type", request.getAuthType());
 
@@ -517,7 +519,7 @@ public class HandleHttpRequest extends AbstractProcessor {
             putAttribute(attributes, "http.character.encoding", 
request.getCharacterEncoding());
             putAttribute(attributes, "http.locale", request.getLocale());
             putAttribute(attributes, "http.server.name", 
request.getServerName());
-            putAttribute(attributes, "http.server.port", 
request.getServerPort());
+            putAttribute(attributes, HTTPUtils.HTTP_PORT, 
request.getServerPort());
 
             final Enumeration<String> paramEnumeration = 
request.getParameterNames();
             while (paramEnumeration.hasMoreElements()) {
@@ -585,7 +587,7 @@ public class HandleHttpRequest extends AbstractProcessor {
             subjectDn = cert.getSubjectDN().getName();
             final String issuerDn = cert.getIssuerDN().getName();
 
-            putAttribute(attributes, "http.subject.dn", subjectDn);
+            putAttribute(attributes, HTTPUtils.HTTP_SSL_CERT, subjectDn);
             putAttribute(attributes, "http.issuer.dn", issuerDn);
         } else {
             subjectDn = null;
@@ -613,7 +615,7 @@ public class HandleHttpRequest extends AbstractProcessor {
         }
 
         final long receiveMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-        session.getProvenanceReporter().receive(flowFile, 
request.getRequestURI(), "Received from " + request.getRemoteAddr() + 
(subjectDn == null ? "" : " with DN=" + subjectDn), receiveMillis);
+        session.getProvenanceReporter().receive(flowFile, 
HTTPUtils.getURI(attributes), "Received from " + request.getRemoteAddr() + 
(subjectDn == null ? "" : " with DN=" + subjectDn), receiveMillis);
         session.transfer(flowFile, REL_SUCCESS);
         getLogger().info("Transferring {} to 'success'; received from {}", new 
Object[]{flowFile, request.getRemoteAddr()});
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9064b976/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
index 1a24e6e..e15abcc 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
@@ -21,6 +21,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 import javax.servlet.http.HttpServletResponse;
@@ -29,6 +30,7 @@ import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -41,19 +43,26 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.HTTPUtils;
+import org.apache.nifi.util.StopWatch;
 
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"http", "https", "response", "egress", "web service"})
 @CapabilityDescription("Sends an HTTP Response to the Requestor that generated 
a FlowFile. This Processor is designed to be used in conjunction with "
         + "the HandleHttpRequest in order to create a web service.")
 @DynamicProperty(name = "An HTTP header name", value = "An HTTP header value", 
description = "These HTTPHeaders are set in the HTTP Response")
-@ReadsAttribute(attribute = "http.context.identifier", description = "The 
value of this attribute is used to lookup the HTTP Response so that the "
-        + "proper message can be sent back to the requestor. If this attribute 
is missing, the FlowFile will be routed to 'failure.'")
+@ReadsAttributes({
+    @ReadsAttribute(attribute = HTTPUtils.HTTP_CONTEXT_ID, description = "The 
value of this attribute is used to lookup the HTTP Response so that the "
+        + "proper message can be sent back to the requestor. If this attribute 
is missing, the FlowFile will be routed to 'failure.'"),
+    @ReadsAttribute(attribute = HTTPUtils.HTTP_REQUEST_URI, description = 
"Value of the URI requested by the client. Used for provenance event."),
+    @ReadsAttribute(attribute = HTTPUtils.HTTP_REMOTE_HOST, description = "IP 
address of the client. Used for provenance event."),
+    @ReadsAttribute(attribute = HTTPUtils.HTTP_LOCAL_NAME, description = "IP 
address/hostname of the server. Used for provenance event."),
+    @ReadsAttribute(attribute = HTTPUtils.HTTP_PORT, description = "Listening 
port of the server. Used for provenance event."),
+    @ReadsAttribute(attribute = HTTPUtils.HTTP_SSL_CERT, description = "SSL 
distinguished name (if any). Used for provenance event.")})
 @SeeAlso(value = {HandleHttpRequest.class}, classNames = 
{"org.apache.nifi.http.StandardHttpContextMap", 
"org.apache.nifi.ssl.StandardSSLContextService"})
 public class HandleHttpResponse extends AbstractProcessor {
 
     public static final Pattern NUMBER_PATTERN = Pattern.compile("[0-9]+");
-    public static final String HTTP_CONTEXT_ID = "http.context.identifier";
 
     public static final PropertyDescriptor STATUS_CODE = new 
PropertyDescriptor.Builder()
             .name("HTTP Status Code")
@@ -113,10 +122,12 @@ public class HandleHttpResponse extends AbstractProcessor 
{
             return;
         }
 
-        final String contextIdentifier = 
flowFile.getAttribute(HTTP_CONTEXT_ID);
+        final StopWatch stopWatch = new StopWatch(true);
+
+        final String contextIdentifier = 
flowFile.getAttribute(HTTPUtils.HTTP_CONTEXT_ID);
         if (contextIdentifier == null) {
             session.transfer(flowFile, REL_FAILURE);
-            getLogger().warn("Failed to respond to HTTP request for {} because 
FlowFile did not have an 'http.context.identifier' attribute",
+            getLogger().warn("Failed to respond to HTTP request for {} because 
FlowFile did not have an '" + HTTPUtils.HTTP_CONTEXT_ID + "' attribute",
                     new Object[]{flowFile});
             return;
         }
@@ -132,7 +143,7 @@ public class HandleHttpResponse extends AbstractProcessor {
         if (response == null) {
             session.transfer(flowFile, REL_FAILURE);
             getLogger().error("Failed to respond to HTTP request for {} 
because FlowFile had an '{}' attribute of {} but could not find an HTTP 
Response Object for this identifier",
-                    new Object[]{flowFile, HTTP_CONTEXT_ID, 
contextIdentifier});
+                    new Object[]{flowFile, HTTPUtils.HTTP_CONTEXT_ID, 
contextIdentifier});
             return;
         }
 
@@ -168,6 +179,7 @@ public class HandleHttpResponse extends AbstractProcessor {
             return;
         }
 
+        session.getProvenanceReporter().send(flowFile, 
HTTPUtils.getURI(flowFile.getAttributes()), 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
         session.transfer(flowFile, REL_SUCCESS);
         getLogger().info("Successfully responded to HTTP Request for {} with 
status code {}", new Object[]{flowFile, statusCode});
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9064b976/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/HTTPUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/HTTPUtils.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/HTTPUtils.java
new file mode 100644
index 0000000..937554d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/HTTPUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import java.util.Map;
+
+public class HTTPUtils {
+
+    public static final String HTTP_REQUEST_URI = "http.request.uri";
+    public static final String HTTP_REMOTE_HOST = "http.remote.host";
+    public static final String HTTP_LOCAL_NAME = "http.local.name";
+    public static final String HTTP_PORT = "http.server.port";
+    public static final String HTTP_SSL_CERT = "http.subject.dn";
+    public static final String HTTP_CONTEXT_ID = "http.context.identifier";
+
+    public static String getURI(Map<String, String> map) {
+        final String client = map.get(HTTP_REMOTE_HOST);
+        final String server = map.get(HTTP_LOCAL_NAME);
+        final String port = map.get(HTTP_PORT);
+        final String uri = map.get(HTTP_REQUEST_URI);
+        if(map.get(HTTP_SSL_CERT) == null) {
+            return "http://"; + client + "@" + server + ":" + port + uri;
+        } else {
+            return "https://"; + client + "@" + server + ":" + port + uri;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9064b976/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
index 84fb26d..bdf1cdc 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
@@ -39,6 +39,8 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.http.HttpContextMap;
 import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processors.standard.util.HTTPUtils;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -63,7 +65,12 @@ public class TestHandleHttpResponse {
         runner.setProperty("no-valid-attr", "${no-valid-attr}");
 
         final Map<String, String> attributes = new HashMap<>();
-        attributes.put(HandleHttpResponse.HTTP_CONTEXT_ID, "my-id");
+        attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
+        attributes.put(HTTPUtils.HTTP_REQUEST_URI, "/test");
+        attributes.put(HTTPUtils.HTTP_LOCAL_NAME, "server");
+        attributes.put(HTTPUtils.HTTP_PORT, "8443");
+        attributes.put(HTTPUtils.HTTP_REMOTE_HOST, "client");
+        attributes.put(HTTPUtils.HTTP_SSL_CERT, "sslDN");
         attributes.put("my-attr", "hello");
         attributes.put("status.code", "201");
 
@@ -72,6 +79,9 @@ public class TestHandleHttpResponse {
         runner.run();
 
         runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_SUCCESS, 
1);
+        assertTrue(runner.getProvenanceEvents().size() == 1);
+        assertEquals(ProvenanceEventType.SEND, 
runner.getProvenanceEvents().get(0).getEventType());
+        assertEquals("https://client@server:8443/test";, 
runner.getProvenanceEvents().get(0).getTransitUri());
 
         assertEquals("hello", contextMap.baos.toString());
         assertEquals("hello", contextMap.headersSent.get("my-attr"));
@@ -94,7 +104,7 @@ public class TestHandleHttpResponse {
         runner.setProperty("no-valid-attr", "${no-valid-attr}");
 
         final Map<String, String> attributes = new HashMap<>();
-        attributes.put(HandleHttpResponse.HTTP_CONTEXT_ID, "my-id");
+        attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
         attributes.put("my-attr", "hello");
         attributes.put("status.code", "201");
 

Reply via email to