Repository: nifi
Updated Branches:
  refs/heads/master 8398ea77b -> 5aa426358


NIFI-1490: multipart/form-data support for ListenHTTP processor
- introducing a in-memory-file-size-threashold, above which the incoming file 
is written to local disk
- using java.io.tmpdir for such file writes
- enhancing documentation


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c81a1351
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c81a1351
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c81a1351

Branch: refs/heads/master
Commit: c81a135161c7af106117dcc616706798c4564669
Parents: 8398ea7
Author: Endre Zoltan Kovacs <[email protected]>
Authored: Thu Sep 6 17:33:33 2018 +0200
Committer: Mark Payne <[email protected]>
Committed: Thu Oct 11 15:41:19 2018 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/ListenHTTP.java    |  29 +-
 .../standard/servlets/ListenHTTPServlet.java    | 329 ++++++++++++-------
 .../processors/standard/TestListenHTTP.java     | 174 ++++++++--
 3 files changed, 392 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c81a1351/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index 209e6d1..39c91db 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -134,6 +134,25 @@ public class ListenHTTP extends 
AbstractSessionFactoryProcessor {
         .defaultValue(String.valueOf(HttpServletResponse.SC_OK))
         .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
         .build();
+    public static final PropertyDescriptor MAX_REQUEST_SIZE = new 
PropertyDescriptor.Builder()
+        .name("max-request-size")
+        .displayName("Max Request Size")
+        .description("The max size of the request. Only applies for requests 
with Content-Type: multipart/form-data, "
+                + "and is used to prevent denial of service type of attacks, 
to prevent filling up the heap or disk space")
+        .required(true)
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .defaultValue("1 MB")
+        .build();
+    public static final PropertyDescriptor IN_MEMORY_FILE_SIZE_THRESHOLD = new 
PropertyDescriptor.Builder()
+        .name("in-memory-file-size-threshold")
+        .displayName("The threshold size, at which the contents of an incoming 
file would be written to disk. "
+                + "Only applies for requests with Content-Type: 
multipart/form-data. "
+                + "It is used to prevent denial of service type of attacks, to 
prevent filling up the heap or disk space.")
+        .description("The threshold value for writing a file to disk.")
+        .required(true)
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .defaultValue("512 KB")
+        .build();
 
     public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
     public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
@@ -145,6 +164,8 @@ public class ListenHTTP extends 
AbstractSessionFactoryProcessor {
     public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = 
"streamThrottler";
     public static final String CONTEXT_ATTRIBUTE_BASE_PATH = "basePath";
     public static final String CONTEXT_ATTRIBUTE_RETURN_CODE = "returnCode";
+    public static final String CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE = 
"maxRequestSize";
+    public static final String CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD 
= "inMemoryFileSizeThreshold";
 
     private volatile Server server = null;
     private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap 
= new ConcurrentHashMap<>();
@@ -166,6 +187,8 @@ public class ListenHTTP extends 
AbstractSessionFactoryProcessor {
         descriptors.add(MAX_UNCONFIRMED_TIME);
         descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX);
         descriptors.add(RETURN_CODE);
+        descriptors.add(MAX_REQUEST_SIZE);
+        descriptors.add(IN_MEMORY_FILE_SIZE_THRESHOLD);
         this.properties = Collections.unmodifiableList(descriptors);
     }
 
@@ -214,6 +237,8 @@ public class ListenHTTP extends 
AbstractSessionFactoryProcessor {
         final Double maxBytesPerSecond = 
context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
         final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? 
null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue());
         final int returnCode = context.getProperty(RETURN_CODE).asInteger();
+        long maxRequestSize = 
context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).longValue();
+        int inMemoryFileSizeThreshold = 
context.getProperty(IN_MEMORY_FILE_SIZE_THRESHOLD).asDataSize(DataUnit.B).intValue();
         throttlerRef.set(streamThrottler);
 
         final boolean needClientAuth = sslContextService != null && 
sslContextService.getTrustStoreFile() != null;
@@ -295,7 +320,9 @@ public class ListenHTTP extends 
AbstractSessionFactoryProcessor {
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, 
Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue()));
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, 
streamThrottler);
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_BASE_PATH, basePath);
-        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_RETURN_CODE,returnCode);
+        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_RETURN_CODE, returnCode);
+        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE, 
maxRequestSize);
+        
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD, 
inMemoryFileSizeThreshold);
 
         if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) {
             contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, 
Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81a1351/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index 85de6f8..8803f0b 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@ -20,10 +20,12 @@ import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
 import java.security.cert.X509Certificate;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -34,14 +36,18 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 import java.util.zip.GZIPInputStream;
+
+import javax.servlet.MultipartConfigElement;
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.Part;
 import javax.ws.rs.Path;
 import javax.ws.rs.core.MediaType;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.flowfile.FlowFile;
@@ -54,10 +60,15 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processors.standard.ListenHTTP;
 import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper;
 import org.apache.nifi.stream.io.StreamThrottler;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.FlowFileUnpackager;
 import org.apache.nifi.util.FlowFileUnpackagerV1;
 import org.apache.nifi.util.FlowFileUnpackagerV2;
 import org.apache.nifi.util.FlowFileUnpackagerV3;
+import org.eclipse.jetty.server.Request;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
 
 
 @Path("")
@@ -94,6 +105,8 @@ public class ListenHTTPServlet extends HttpServlet {
     private StreamThrottler streamThrottler;
     private String basePath;
     private int returnCode;
+    private long maxRequestSize;
+    private int inMemoryFileSizeThreshold;
 
     @SuppressWarnings("unchecked")
     @Override
@@ -108,6 +121,8 @@ public class ListenHTTPServlet extends HttpServlet {
         this.streamThrottler = (StreamThrottler) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER);
         this.basePath = (String) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_BASE_PATH);
         this.returnCode = (int) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_RETURN_CODE);
+        this.maxRequestSize = (long) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE);
+        this.inMemoryFileSizeThreshold = (int) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD);
     }
 
     @Override
@@ -133,8 +148,6 @@ public class ListenHTTPServlet extends HttpServlet {
         } while (sessionFactory == null);
 
         final ProcessSession session = sessionFactory.createSession();
-        FlowFile flowFile = null;
-        String holdUuid = null;
         String foundSubject = null;
         try {
             final long n = filesReceived.getAndIncrement() % 
FILES_BEFORE_CHECKING_DESTINATION_SPACE;
@@ -191,134 +204,212 @@ public class ListenHTTPServlet extends HttpServlet {
                 logger.debug("Received request from " + 
request.getRemoteHost() + ", createHold=" + createHold + ", content-type=" + 
contentType + ", gzip=" + contentGzipped);
             }
 
-            final AtomicBoolean hasMoreData = new AtomicBoolean(false);
-            final FlowFileUnpackager unpackager;
-            if (APPLICATION_FLOW_FILE_V3.equals(contentType)) {
-                unpackager = new FlowFileUnpackagerV3();
-            } else if (APPLICATION_FLOW_FILE_V2.equals(contentType)) {
-                unpackager = new FlowFileUnpackagerV2();
-            } else if (APPLICATION_FLOW_FILE_V1.equals(contentType)) {
-                unpackager = new FlowFileUnpackagerV1();
+            Set<FlowFile> flowFileSet = null;
+            if (!Strings.isNullOrEmpty(request.getContentType()) && 
request.getContentType().contains("multipart/form-data")) {
+              flowFileSet = handleMultipartRequest(request, session, 
foundSubject, destinationIsLegacyNiFi, contentType, in);
             } else {
-                unpackager = null;
+              flowFileSet = handleRequest(request, session, foundSubject, 
destinationIsLegacyNiFi, contentType, in);
             }
+            proceedFlow(request, response, session, foundSubject, createHold, 
flowFileSet);
+        } catch (final Throwable t) {
+            handleException(request, response, session, foundSubject, t);
+        }
+    }
 
-            final Set<FlowFile> flowFileSet = new HashSet<>();
-
-            do {
-                final long startNanos = System.nanoTime();
-                final Map<String, String> attributes = new HashMap<>();
-                flowFile = session.create();
-                flowFile = session.write(flowFile, new OutputStreamCallback() {
-                    @Override
-                    public void process(final OutputStream rawOut) throws 
IOException {
-                        try (final BufferedOutputStream bos = new 
BufferedOutputStream(rawOut, 65536)) {
-                            if (unpackager == null) {
-                                IOUtils.copy(in, bos);
-                                hasMoreData.set(false);
-                            } else {
-                                
attributes.putAll(unpackager.unpackageFlowFile(in, bos));
-
-                                if (destinationIsLegacyNiFi) {
-                                    if 
(attributes.containsKey("nf.file.name")) {
-                                        // for backward compatibility with old 
nifi...
-                                        
attributes.put(CoreAttributes.FILENAME.key(), 
attributes.remove("nf.file.name"));
-                                    }
-
-                                    if 
(attributes.containsKey("nf.file.path")) {
-                                        
attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path"));
-                                    }
-                                }
-
-                                hasMoreData.set(unpackager.hasMoreData());
-                            }
-                        }
-                    }
-                });
-
-                final long transferNanos = System.nanoTime() - startNanos;
-                final long transferMillis = 
TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+    private void handleException(final HttpServletRequest request, final 
HttpServletResponse response,
+        final ProcessSession session, String foundSubject, final Throwable t) 
throws IOException {
+      session.rollback();
+      logger.error("Unable to receive file from Remote Host: [{}] SubjectDN 
[{}] due to {}", new Object[]{request.getRemoteHost(), foundSubject, t});
+      response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, 
t.toString());
+    }
 
-                // put metadata on flowfile
-                final String nameVal = 
request.getHeader(CoreAttributes.FILENAME.key());
-                if (StringUtils.isNotBlank(nameVal)) {
-                    attributes.put(CoreAttributes.FILENAME.key(), nameVal);
-                }
+    private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, 
ProcessSession session, String foundSubject,
+        boolean destinationIsLegacyNiFi, String contentType, InputStream in) 
throws IOException, ServletException {
+      Set<FlowFile> flowFileSet = new HashSet<>();
+      String tempDir = System.getProperty("java.io.tmpdir");
+      request.setAttribute(Request.__MULTIPART_CONFIG_ELEMENT, new 
MultipartConfigElement(tempDir, maxRequestSize, maxRequestSize, 
inMemoryFileSizeThreshold));
+      List<Part> requestParts = ImmutableList.copyOf(request.getParts());
+      for (int i = 0; i < requestParts.size(); i++) {
+        Part part = requestParts.get(i);
+        FlowFile flowFile = session.create();
+        try (OutputStream flowFileOoutputStream = session.write(flowFile)) {
+          StreamUtils.copy(part.getInputStream(), flowFileOoutputStream);
+        }
+        flowFile = saveRequestDetailsAsAttributes(request, session, 
foundSubject, flowFile);
+        flowFile = savePartDetailsAsAttributes(session, part, flowFile, i, 
requestParts.size());
+        flowFileSet.add(flowFile);
+      }
+      return flowFileSet;
+    }
 
-                // put arbitrary headers on flow file
-                for (Enumeration<String> headerEnum = request.getHeaderNames();
-                        headerEnum.hasMoreElements();) {
-                    String headerName = headerEnum.nextElement();
-                    if (headerPattern != null && 
headerPattern.matcher(headerName).matches()) {
-                        String headerValue = request.getHeader(headerName);
-                        attributes.put(headerName, headerValue);
-                    }
-                }
+    private FlowFile savePartDetailsAsAttributes(ProcessSession session, Part 
part, FlowFile flowFile, final int i, final int allPartsCount) {
+      Map<String, String> attributes = new HashMap<>();
+      for (String headerName : part.getHeaderNames()) {
+        final String headerValue = part.getHeader(headerName);
+        putAttribute(attributes, "http.headers.multipart." + headerName, 
headerValue);
+      }
+      putAttribute(attributes, "http.multipart.size", part.getSize());
+      putAttribute(attributes, "http.multipart.content.type", 
part.getContentType());
+      putAttribute(attributes, "http.multipart.name", part.getName());
+      putAttribute(attributes, "http.multipart.filename", 
part.getSubmittedFileName());
+      putAttribute(attributes, "http.multipart.fragments.sequence.number", 
i+1);
+      putAttribute(attributes, "http.multipart.fragments.total.number", 
allPartsCount);
+      return session.putAllAttributes(flowFile, attributes);
+    }
 
-                String sourceSystemFlowFileIdentifier = 
attributes.get(CoreAttributes.UUID.key());
-                if (sourceSystemFlowFileIdentifier != null) {
-                    sourceSystemFlowFileIdentifier = "urn:nifi:" + 
sourceSystemFlowFileIdentifier;
+    private Set<FlowFile> handleRequest(final HttpServletRequest request, 
final ProcessSession session,
+        String foundSubject, final boolean destinationIsLegacyNiFi, final 
String contentType, final InputStream in) {
+      FlowFile flowFile = null;
+      String holdUuid = null;
+      final AtomicBoolean hasMoreData = new AtomicBoolean(false);
+      final FlowFileUnpackager unpackager;
+      if (APPLICATION_FLOW_FILE_V3.equals(contentType)) {
+          unpackager = new FlowFileUnpackagerV3();
+      } else if (APPLICATION_FLOW_FILE_V2.equals(contentType)) {
+          unpackager = new FlowFileUnpackagerV2();
+      } else if (APPLICATION_FLOW_FILE_V1.equals(contentType)) {
+          unpackager = new FlowFileUnpackagerV1();
+      } else {
+          unpackager = null;
+      }
+
+      final Set<FlowFile> flowFileSet = new HashSet<>();
+
+      do {
+          final long startNanos = System.nanoTime();
+          final Map<String, String> attributes = new HashMap<>();
+          flowFile = session.create();
+          flowFile = session.write(flowFile, new OutputStreamCallback() {
+              @Override
+              public void process(final OutputStream rawOut) throws 
IOException {
+                  try (final BufferedOutputStream bos = new 
BufferedOutputStream(rawOut, 65536)) {
+                      if (unpackager == null) {
+                          IOUtils.copy(in, bos);
+                          hasMoreData.set(false);
+                      } else {
+                          attributes.putAll(unpackager.unpackageFlowFile(in, 
bos));
+
+                          if (destinationIsLegacyNiFi) {
+                              if (attributes.containsKey("nf.file.name")) {
+                                  // for backward compatibility with old 
nifi...
+                                  
attributes.put(CoreAttributes.FILENAME.key(), 
attributes.remove("nf.file.name"));
+                              }
+
+                              if (attributes.containsKey("nf.file.path")) {
+                                  attributes.put(CoreAttributes.PATH.key(), 
attributes.remove("nf.file.path"));
+                              }
+                          }
+
+                          hasMoreData.set(unpackager.hasMoreData());
+                      }
+                  }
+              }
+          });
+
+          final long transferNanos = System.nanoTime() - startNanos;
+          final long transferMillis = 
TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+
+          // put metadata on flowfile
+          final String nameVal = 
request.getHeader(CoreAttributes.FILENAME.key());
+          if (StringUtils.isNotBlank(nameVal)) {
+              attributes.put(CoreAttributes.FILENAME.key(), nameVal);
+          }
+
+          String sourceSystemFlowFileIdentifier = 
attributes.get(CoreAttributes.UUID.key());
+          if (sourceSystemFlowFileIdentifier != null) {
+              sourceSystemFlowFileIdentifier = "urn:nifi:" + 
sourceSystemFlowFileIdentifier;
+
+              // If we receveied a UUID, we want to give the FlowFile a new 
UUID and register the sending system's
+              // identifier as the SourceSystemFlowFileIdentifier field in the 
Provenance RECEIVE event
+              attributes.put(CoreAttributes.UUID.key(), 
UUID.randomUUID().toString());
+          }
+
+          flowFile = session.putAllAttributes(flowFile, attributes);
+          flowFile = saveRequestDetailsAsAttributes(request, session, 
foundSubject, flowFile);
+          session.getProvenanceReporter().receive(flowFile, 
request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote 
DN=" + foundSubject, transferMillis);
+          flowFileSet.add(flowFile);
+
+          if (holdUuid == null) {
+              holdUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
+          }
+      } while (hasMoreData.get());
+      return flowFileSet;
+    }
 
-                    // If we receveied a UUID, we want to give the FlowFile a 
new UUID and register the sending system's
-                    // identifier as the SourceSystemFlowFileIdentifier field 
in the Provenance RECEIVE event
-                    attributes.put(CoreAttributes.UUID.key(), 
UUID.randomUUID().toString());
-                }
+    protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest 
request, final ProcessSession session,
+        String foundSubject, FlowFile flowFile) {
+      Map<String, String> attributes = new HashMap<>();
+      addMatchingRequestHeaders(request, attributes);
+      flowFile = session.putAllAttributes(flowFile, attributes);
+      flowFile = session.putAttribute(flowFile, 
"restlistener.remote.source.host", request.getRemoteHost());
+      flowFile = session.putAttribute(flowFile, "restlistener.request.uri", 
request.getRequestURI());
+      flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", 
foundSubject);
+      return flowFile;
+    }
 
-                flowFile = session.putAllAttributes(flowFile, attributes);
-                session.getProvenanceReporter().receive(flowFile, 
request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote 
DN=" + foundSubject, transferMillis);
-                flowFile = session.putAttribute(flowFile, 
"restlistener.remote.source.host", request.getRemoteHost());
-                flowFile = session.putAttribute(flowFile, 
"restlistener.request.uri", request.getRequestURI());
-                flowFile = session.putAttribute(flowFile, 
"restlistener.remote.user.dn", foundSubject);
-                flowFileSet.add(flowFile);
+    private void addMatchingRequestHeaders(final HttpServletRequest request, 
final Map<String, String> attributes) {
+      // put arbitrary headers on flow file
+      for (Enumeration<String> headerEnum = request.getHeaderNames();
+              headerEnum.hasMoreElements();) {
+          String headerName = headerEnum.nextElement();
+          if (headerPattern != null && 
headerPattern.matcher(headerName).matches()) {
+              String headerValue = request.getHeader(headerName);
+              attributes.put(headerName, headerValue);
+          }
+      }
+    }
 
-                if (holdUuid == null) {
-                    holdUuid = 
flowFile.getAttribute(CoreAttributes.UUID.key());
-                }
-            } while (hasMoreData.get());
+    protected void proceedFlow(final HttpServletRequest request, final 
HttpServletResponse response,
+        final ProcessSession session, String foundSubject, final boolean 
createHold,
+        final Set<FlowFile> flowFileSet) throws IOException, 
UnsupportedEncodingException {
+      if (createHold) {
+          String uuid = UUID.randomUUID().toString();
+
+          if (flowFileMap.containsKey(uuid)) {
+              uuid = UUID.randomUUID().toString();
+          }
+
+          final FlowFileEntryTimeWrapper wrapper = new 
FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis(), 
request.getRemoteHost());
+          FlowFileEntryTimeWrapper previousWrapper;
+          do {
+            previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper);
+              if (previousWrapper != null) {
+                  uuid = UUID.randomUUID().toString();
+              }
+          } while (previousWrapper != null);
+
+          response.setStatus(HttpServletResponse.SC_SEE_OTHER);
+          final String ackUri =  "/" + basePath + "/holds/" + uuid;
+          response.addHeader(LOCATION_HEADER_NAME, ackUri);
+          response.addHeader(LOCATION_URI_INTENT_NAME, 
LOCATION_URI_INTENT_VALUE);
+          response.getOutputStream().write(ackUri.getBytes("UTF-8"));
+          if (logger.isDebugEnabled()) {
+              logger.debug("Ingested {} from Remote Host: [{}] Port [{}] 
SubjectDN [{}]; placed hold on these {} files with ID {}",
+                      new Object[]{flowFileSet, request.getRemoteHost(), 
request.getRemotePort(), foundSubject, flowFileSet.size(), uuid});
+          }
+      } else {
+          response.setStatus(this.returnCode);
+          logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN 
[{}]; transferring to 'success'",
+                  new Object[]{request.getRemoteHost(), 
request.getRemotePort(), foundSubject});
+
+          session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS);
+          session.commit();
+      }
+    }
 
-            if (createHold) {
-                String uuid = (holdUuid == null) ? 
UUID.randomUUID().toString() : holdUuid;
+    private void putAttribute(final Map<String, String> map, final String key, 
final Object value) {
+      if (value == null) {
+          return;
+      }
 
-                if (flowFileMap.containsKey(uuid)) {
-                    uuid = UUID.randomUUID().toString();
-                }
+      putAttribute(map, key, value.toString());
+  }
 
-                final FlowFileEntryTimeWrapper wrapper = new 
FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis(), 
request.getRemoteHost());
-                FlowFileEntryTimeWrapper previousWrapper;
-                do {
-                    previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper);
-                    if (previousWrapper != null) {
-                        uuid = UUID.randomUUID().toString();
-                    }
-                } while (previousWrapper != null);
-
-                response.setStatus(HttpServletResponse.SC_SEE_OTHER);
-                final String ackUri =  "/" + basePath + "/holds/" + uuid;
-                response.addHeader(LOCATION_HEADER_NAME, ackUri);
-                response.addHeader(LOCATION_URI_INTENT_NAME, 
LOCATION_URI_INTENT_VALUE);
-                response.getOutputStream().write(ackUri.getBytes("UTF-8"));
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Ingested {} from Remote Host: [{}] Port [{}] 
SubjectDN [{}]; placed hold on these {} files with ID {}",
-                            new Object[]{flowFileSet, request.getRemoteHost(), 
request.getRemotePort(), foundSubject, flowFileSet.size(), uuid});
-                }
-            } else {
-                response.setStatus(this.returnCode);
-                logger.info("Received from Remote Host: [{}] Port [{}] 
SubjectDN [{}]; transferring to 'success' {}",
-                        new Object[]{request.getRemoteHost(), 
request.getRemotePort(), foundSubject, flowFile});
+    private void putAttribute(final Map<String, String> map, final String key, 
final String value) {
+      if (value == null) {
+          return;
+      }
 
-                session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS);
-                session.commit();
-            }
-        } catch (final Throwable t) {
-            session.rollback();
-            if (flowFile == null) {
-                logger.error("Unable to receive file from Remote Host: [{}] 
SubjectDN [{}] due to {}",
-                        new Object[]{request.getRemoteHost(), foundSubject, 
t});
-            } else {
-                logger.error("Unable to receive file {} from Remote Host: [{}] 
SubjectDN [{}] due to {}",
-                        new Object[]{flowFile, request.getRemoteHost(), 
foundSubject, t});
-            }
-            response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, 
t.toString());
-        }
-    }
+      map.put(key, value);
+  }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81a1351/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
index 117a068..f8e9015 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
@@ -27,15 +27,31 @@ import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
+import com.google.common.io.Files;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLContext;
@@ -66,7 +82,7 @@ public class TestListenHTTP {
     public void setup() throws IOException {
         proc = new ListenHTTP();
         runner = TestRunners.newTestRunner(proc);
-        availablePort = NetworkUtils.availablePort();;
+        availablePort = NetworkUtils.availablePort();
         runner.setVariable(PORT_VARIABLE, Integer.toString(availablePort));
         runner.setVariable(BASEPATH_VARIABLE, HTTP_BASE_PATH);
 
@@ -181,8 +197,8 @@ public class TestListenHTTP {
     private int executePOST(String message) throws Exception {
         final SSLContextService sslContextService = 
runner.getControllerService(SSL_CONTEXT_SERVICE_IDENTIFIER, 
SSLContextService.class);
         final boolean secure = (sslContextService != null);
-        final String scheme = secure ? "https" : "http";
-        final URL url = new URL(scheme + "://localhost:" + availablePort + "/" 
+ HTTP_BASE_PATH);
+        String endpointUrl = buildUrl(secure);
+        final URL url = new URL(endpointUrl);
         HttpURLConnection connection;
 
         if (secure) {
@@ -207,6 +223,10 @@ public class TestListenHTTP {
         return connection.getResponseCode();
     }
 
+    private String buildUrl(final boolean secure) {
+      return String.format("%s://localhost:%s/%s", secure ? "https" : "http" , 
availablePort,  HTTP_BASE_PATH);
+    }
+
     private void testPOSTRequestsReceived(int returnCode) throws Exception {
         final List<String> messages = new ArrayList<>();
         messages.add("payload 1");
@@ -225,13 +245,29 @@ public class TestListenHTTP {
         mockFlowFiles.get(3).assertContentEquals("payload 2");
     }
 
+    private void startWebServerAndSendRequests(Runnable 
sendRequestToWebserver, int numberOfExpectedFlowFiles, int returnCode) throws 
Exception {
+      final ProcessSessionFactory processSessionFactory = 
runner.getProcessSessionFactory();
+      final ProcessContext context = runner.getProcessContext();
+      proc.createHttpServer(context);
+
+      new Thread(sendRequestToWebserver).start();
+
+      long responseTimeout = 10000;
+
+      int numTransferred = 0;
+      long startTime = System.currentTimeMillis();
+      while (numTransferred < numberOfExpectedFlowFiles && 
(System.currentTimeMillis() - startTime < responseTimeout)) {
+          proc.onTrigger(context, processSessionFactory);
+          numTransferred = 
runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS).size();
+          Thread.sleep(100);
+      }
+
+      runner.assertTransferCount(ListenHTTP.RELATIONSHIP_SUCCESS, 
numberOfExpectedFlowFiles);
+    }
+
     private void startWebServerAndSendMessages(final List<String> messages, 
int returnCode)
             throws Exception {
 
-        final ProcessSessionFactory processSessionFactory = 
runner.getProcessSessionFactory();
-        final ProcessContext context = runner.getProcessContext();
-        proc.createHttpServer(context);
-
         Runnable sendMessagestoWebServer = () -> {
             try {
                 for (final String message : messages) {
@@ -244,20 +280,8 @@ public class TestListenHTTP {
                 fail("Not expecting error here.");
             }
         };
-        new Thread(sendMessagestoWebServer).start();
-
-        long responseTimeout = 10000;
-
-        int numTransferred = 0;
-        long startTime = System.currentTimeMillis();
-        while (numTransferred < messages.size() && (System.currentTimeMillis() 
- startTime < responseTimeout)) {
-            proc.onTrigger(context, processSessionFactory);
-            numTransferred = 
runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS).size();
-            Thread.sleep(100);
-        }
-
-        runner.assertTransferCount(ListenHTTP.RELATIONSHIP_SUCCESS, 
messages.size());
 
+        startWebServerAndSendRequests(sendMessagestoWebServer, 
messages.size(), returnCode);
     }
 
     private SSLContextService configureProcessorSslContextService() throws 
InitializationException {
@@ -287,4 +311,114 @@ public class TestListenHTTP {
         runner.setProperty(ListenHTTP.SSL_CONTEXT_SERVICE, 
SSL_CONTEXT_SERVICE_IDENTIFIER);
         return sslContextService;
     }
+
+
+    @Test(/*timeout=10000*/)
+    public void testMultipartFormDataRequest() throws Exception {
+
+      runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+      runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+      runner.setProperty(ListenHTTP.RETURN_CODE, 
Integer.toString(HttpServletResponse.SC_OK));
+
+      final SSLContextService sslContextService = 
runner.getControllerService(SSL_CONTEXT_SERVICE_IDENTIFIER, 
SSLContextService.class);
+      final boolean isSecure = (sslContextService != null);
+
+      Runnable sendRequestToWebserver = () -> {
+        try {
+          MultipartBody multipartBody = new 
MultipartBody.Builder().setType(MultipartBody.FORM)
+              .addFormDataPart("p1", "v1")
+              .addFormDataPart("p2", "v2")
+              .addFormDataPart("file1", "my-file-text.txt", 
RequestBody.create(MediaType.parse("text/plain"), 
createTextFile("my-file-text.txt", "Hello", "World")))
+              .addFormDataPart("file2", "my-file-data.json", 
RequestBody.create(MediaType.parse("application/json"), 
createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }")))
+              .addFormDataPart("file3", "my-file-binary.bin", 
RequestBody.create(MediaType.parse("application/octet-stream"), 
generateRandomBinaryData(100)))
+              .build();
+
+          Request request =
+              new Request.Builder()
+                .url(buildUrl(isSecure))
+                .post(multipartBody)
+                .build();
+
+          int timeout = 3000;
+          OkHttpClient client = new OkHttpClient.Builder()
+                .readTimeout(timeout, TimeUnit.MILLISECONDS)
+                .writeTimeout(timeout, TimeUnit.MILLISECONDS)
+                .build();
+
+          try (Response response = client.newCall(request).execute()) {
+            Assert.assertTrue(String.format("Unexpected code: %s, body: %s", 
response.code(), response.body().string()), response.isSuccessful());
+          }
+        } catch (final Throwable t) {
+          t.printStackTrace();
+          Assert.fail(t.toString());
+        }
+      };
+
+
+      startWebServerAndSendRequests(sendRequestToWebserver, 5, 200);
+
+      runner.assertAllFlowFilesTransferred(ListenHTTP.RELATIONSHIP_SUCCESS, 5);
+      List<MockFlowFile> flowFilesForRelationship = 
runner.getFlowFilesForRelationship(ListenHTTP.RELATIONSHIP_SUCCESS);
+      // Part fragments are not processed in the order we submitted them.
+      // We cannot rely on the order we sent them in.
+      MockFlowFile mff = findFlowFile(flowFilesForRelationship, 
"http.multipart.name", "p1");
+      mff.assertAttributeEquals("http.multipart.name", "p1");
+      mff.assertAttributeExists("http.multipart.size");
+      mff.assertAttributeEquals("http.multipart.fragments.sequence.number", 
"1");
+      mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+      mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+      mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", 
"p2");
+      mff.assertAttributeEquals("http.multipart.name", "p2");
+      mff.assertAttributeExists("http.multipart.size");
+      mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+      mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+      mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+      mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", 
"file1");
+      mff.assertAttributeEquals("http.multipart.name", "file1");
+      mff.assertAttributeEquals("http.multipart.filename", "my-file-text.txt");
+      mff.assertAttributeEquals("http.headers.multipart.content-type", 
"text/plain");
+      mff.assertAttributeExists("http.multipart.size");
+      mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+      mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+      mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+      mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", 
"file2");
+      mff.assertAttributeEquals("http.multipart.name", "file2");
+      mff.assertAttributeEquals("http.multipart.filename", 
"my-file-data.json");
+      mff.assertAttributeEquals("http.headers.multipart.content-type", 
"application/json");
+      mff.assertAttributeExists("http.multipart.size");
+      mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+      mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+      mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+      mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", 
"file3");
+      mff.assertAttributeEquals("http.multipart.name", "file3");
+      mff.assertAttributeEquals("http.multipart.filename", 
"my-file-binary.bin");
+      mff.assertAttributeEquals("http.headers.multipart.content-type", 
"application/octet-stream");
+      mff.assertAttributeExists("http.multipart.size");
+      mff.assertAttributeExists("http.multipart.fragments.sequence.number");
+      mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
+      mff.assertAttributeExists("http.headers.multipart.content-disposition");
+    }
+
+     private byte[] generateRandomBinaryData(int i) {
+      byte[] bytes = new byte[100];
+      new Random().nextBytes(bytes);
+      return bytes;
+    }
+     private File createTextFile(String fileName, String... lines) throws 
IOException {
+      File file = new File(fileName);
+      file.deleteOnExit();
+      for (String string : lines) {
+        Files.append(string, file, Charsets.UTF_8);
+      }
+      return file;
+    }
+     protected MockFlowFile findFlowFile(List<MockFlowFile> 
flowFilesForRelationship, String attributeName, String attributeValue) {
+      Optional<MockFlowFile> optional = 
Iterables.tryFind(flowFilesForRelationship, ff -> 
ff.getAttribute(attributeName).equals(attributeValue));
+      Assert.assertTrue(optional.isPresent());
+      return optional.get();
+    }
 }

Reply via email to