NIFI-1490: better field naming / displayname and description mix up fix

This closes #2994.

Signed-off-by: Mark Payne <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5aa42635
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5aa42635
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5aa42635

Branch: refs/heads/master
Commit: 5aa426358802bfac91657fdb8a8a83094239ced8
Parents: c81a135
Author: Endre Zoltan Kovacs <[email protected]>
Authored: Mon Oct 8 13:10:37 2018 +0200
Committer: Mark Payne <[email protected]>
Committed: Thu Oct 11 15:41:38 2018 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/ListenHTTP.java    |  30 +-
 .../standard/servlets/ListenHTTPServlet.java    | 420 +++++++++----------
 2 files changed, 223 insertions(+), 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa42635/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index 39c91db..5ea9f3a 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -134,21 +134,21 @@ public class ListenHTTP extends 
AbstractSessionFactoryProcessor {
         .defaultValue(String.valueOf(HttpServletResponse.SC_OK))
         .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
         .build();
-    public static final PropertyDescriptor MAX_REQUEST_SIZE = new 
PropertyDescriptor.Builder()
-        .name("max-request-size")
-        .displayName("Max Request Size")
+    public static final PropertyDescriptor MULTIPART_REQUEST_MAX_SIZE = new 
PropertyDescriptor.Builder()
+        .name("multipart-request-max-size")
+        .displayName("Multipart Request Max Size")
         .description("The max size of the request. Only applies for requests 
with Content-Type: multipart/form-data, "
                 + "and is used to prevent denial of service type of attacks, 
to prevent filling up the heap or disk space")
         .required(true)
         .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
         .defaultValue("1 MB")
         .build();
-    public static final PropertyDescriptor IN_MEMORY_FILE_SIZE_THRESHOLD = new 
PropertyDescriptor.Builder()
-        .name("in-memory-file-size-threshold")
-        .displayName("The threshold size, at which the contents of an incoming 
file would be written to disk. "
+    public static final PropertyDescriptor MULTIPART_READ_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
+        .name("multipart-read-buffer-size")
+        .displayName("Multipart Read Buffer Size")
+        .description("The threshold size, at which the contents of an incoming 
file would be written to disk. "
                 + "Only applies for requests with Content-Type: 
multipart/form-data. "
                 + "It is used to prevent denial of service type of attacks, to 
prevent filling up the heap or disk space.")
-        .description("The threshold value for writing a file to disk.")
         .required(true)
         .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
         .defaultValue("512 KB")
@@ -164,8 +164,8 @@ public class ListenHTTP extends 
AbstractSessionFactoryProcessor {
     public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = 
"streamThrottler";
     public static final String CONTEXT_ATTRIBUTE_BASE_PATH = "basePath";
     public static final String CONTEXT_ATTRIBUTE_RETURN_CODE = "returnCode";
-    public static final String CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE = 
"maxRequestSize";
-    public static final String CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD 
= "inMemoryFileSizeThreshold";
+    public static final String CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE = 
"multipartRequestMaxSize";
+    public static final String CONTEXT_ATTRIBUTE_MULTIPART_READ_BUFFER_SIZE = 
"multipartReadBufferSize";
 
     private volatile Server server = null;
     private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap 
= new ConcurrentHashMap<>();
@@ -187,8 +187,8 @@ public class ListenHTTP extends 
AbstractSessionFactoryProcessor {
         descriptors.add(MAX_UNCONFIRMED_TIME);
         descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX);
         descriptors.add(RETURN_CODE);
-        descriptors.add(MAX_REQUEST_SIZE);
-        descriptors.add(IN_MEMORY_FILE_SIZE_THRESHOLD);
+        descriptors.add(MULTIPART_REQUEST_MAX_SIZE);
+        descriptors.add(MULTIPART_READ_BUFFER_SIZE);
         this.properties = Collections.unmodifiableList(descriptors);
     }
 
@@ -237,8 +237,8 @@ public class ListenHTTP extends 
AbstractSessionFactoryProcessor {
         final Double maxBytesPerSecond = 
context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
         final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? 
null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue());
         final int returnCode = context.getProperty(RETURN_CODE).asInteger();
-        long maxRequestSize = 
context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).longValue();
-        int inMemoryFileSizeThreshold = 
context.getProperty(IN_MEMORY_FILE_SIZE_THRESHOLD).asDataSize(DataUnit.B).intValue();
+        long requestMaxSize = 
context.getProperty(MULTIPART_REQUEST_MAX_SIZE).asDataSize(DataUnit.B).longValue();
+        int readBufferSize = 
context.getProperty(MULTIPART_READ_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
         throttlerRef.set(streamThrottler);
 
         final boolean needClientAuth = sslContextService != null && 
sslContextService.getTrustStoreFile() != null;
@@ -321,8 +321,8 @@ public class ListenHTTP extends 
AbstractSessionFactoryProcessor {
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, 
streamThrottler);
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_BASE_PATH, basePath);
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_RETURN_CODE, returnCode);
-        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE, 
maxRequestSize);
-        
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD, 
inMemoryFileSizeThreshold);
+        
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE, 
requestMaxSize);
+        
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_MULTIPART_READ_BUFFER_SIZE, 
readBufferSize);
 
         if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) {
             contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, 
Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa42635/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index 8803f0b..07ccd69 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@ -16,6 +16,37 @@
  */
 package org.apache.nifi.processors.standard.servlets;
 
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processors.standard.ListenHTTP;
+import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper;
+import org.apache.nifi.stream.io.StreamThrottler;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.FlowFileUnpackager;
+import org.apache.nifi.util.FlowFileUnpackagerV1;
+import org.apache.nifi.util.FlowFileUnpackagerV2;
+import org.apache.nifi.util.FlowFileUnpackagerV3;
+import org.eclipse.jetty.server.Request;
+
+import javax.servlet.MultipartConfigElement;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.Part;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.MediaType;
 import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -37,40 +68,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 import java.util.zip.GZIPInputStream;
 
-import javax.servlet.MultipartConfigElement;
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletContext;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.servlet.http.Part;
-import javax.ws.rs.Path;
-import javax.ws.rs.core.MediaType;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processors.standard.ListenHTTP;
-import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper;
-import org.apache.nifi.stream.io.StreamThrottler;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.util.FlowFileUnpackager;
-import org.apache.nifi.util.FlowFileUnpackagerV1;
-import org.apache.nifi.util.FlowFileUnpackagerV2;
-import org.apache.nifi.util.FlowFileUnpackagerV3;
-import org.eclipse.jetty.server.Request;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-
-
 @Path("")
 public class ListenHTTPServlet extends HttpServlet {
 
@@ -105,8 +102,8 @@ public class ListenHTTPServlet extends HttpServlet {
     private StreamThrottler streamThrottler;
     private String basePath;
     private int returnCode;
-    private long maxRequestSize;
-    private int inMemoryFileSizeThreshold;
+    private long multipartRequestMaxSize;
+    private int multipartReadBufferSize;
 
     @SuppressWarnings("unchecked")
     @Override
@@ -121,8 +118,8 @@ public class ListenHTTPServlet extends HttpServlet {
         this.streamThrottler = (StreamThrottler) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER);
         this.basePath = (String) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_BASE_PATH);
         this.returnCode = (int) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_RETURN_CODE);
-        this.maxRequestSize = (long) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE);
-        this.inMemoryFileSizeThreshold = (int) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD);
+        this.multipartRequestMaxSize = (long) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE);
+        this.multipartReadBufferSize = (int) 
context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MULTIPART_READ_BUFFER_SIZE);
     }
 
     @Override
@@ -204,11 +201,11 @@ public class ListenHTTPServlet extends HttpServlet {
                 logger.debug("Received request from " + 
request.getRemoteHost() + ", createHold=" + createHold + ", content-type=" + 
contentType + ", gzip=" + contentGzipped);
             }
 
-            Set<FlowFile> flowFileSet = null;
+            Set<FlowFile> flowFileSet;
             if (!Strings.isNullOrEmpty(request.getContentType()) && 
request.getContentType().contains("multipart/form-data")) {
-              flowFileSet = handleMultipartRequest(request, session, 
foundSubject, destinationIsLegacyNiFi, contentType, in);
+                flowFileSet = handleMultipartRequest(request, session, 
foundSubject);
             } else {
-              flowFileSet = handleRequest(request, session, foundSubject, 
destinationIsLegacyNiFi, contentType, in);
+                flowFileSet = handleRequest(request, session, foundSubject, 
destinationIsLegacyNiFi, contentType, in);
             }
             proceedFlow(request, response, session, foundSubject, createHold, 
flowFileSet);
         } catch (final Throwable t) {
@@ -217,199 +214,198 @@ public class ListenHTTPServlet extends HttpServlet {
     }
 
     private void handleException(final HttpServletRequest request, final 
HttpServletResponse response,
-        final ProcessSession session, String foundSubject, final Throwable t) 
throws IOException {
-      session.rollback();
-      logger.error("Unable to receive file from Remote Host: [{}] SubjectDN 
[{}] due to {}", new Object[]{request.getRemoteHost(), foundSubject, t});
-      response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, 
t.toString());
+                                 final ProcessSession session, String 
foundSubject, final Throwable t) throws IOException {
+        session.rollback();
+        logger.error("Unable to receive file from Remote Host: [{}] SubjectDN 
[{}] due to {}", new Object[]{request.getRemoteHost(), foundSubject, t});
+        response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, 
t.toString());
     }
 
-    private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, 
ProcessSession session, String foundSubject,
-        boolean destinationIsLegacyNiFi, String contentType, InputStream in) 
throws IOException, ServletException {
-      Set<FlowFile> flowFileSet = new HashSet<>();
-      String tempDir = System.getProperty("java.io.tmpdir");
-      request.setAttribute(Request.__MULTIPART_CONFIG_ELEMENT, new 
MultipartConfigElement(tempDir, maxRequestSize, maxRequestSize, 
inMemoryFileSizeThreshold));
-      List<Part> requestParts = ImmutableList.copyOf(request.getParts());
-      for (int i = 0; i < requestParts.size(); i++) {
-        Part part = requestParts.get(i);
-        FlowFile flowFile = session.create();
-        try (OutputStream flowFileOoutputStream = session.write(flowFile)) {
-          StreamUtils.copy(part.getInputStream(), flowFileOoutputStream);
+    private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, 
ProcessSession session, String foundSubject) throws IOException, 
IllegalStateException, ServletException {
+        Set<FlowFile> flowFileSet = new HashSet<>();
+        String tempDir = System.getProperty("java.io.tmpdir");
+        request.setAttribute(Request.__MULTIPART_CONFIG_ELEMENT, new 
MultipartConfigElement(tempDir, multipartRequestMaxSize, 
multipartRequestMaxSize, multipartReadBufferSize));
+        List<Part> requestParts = ImmutableList.copyOf(request.getParts());
+        for (int i = 0; i < requestParts.size(); i++) {
+            Part part = requestParts.get(i);
+            FlowFile flowFile = session.create();
+            try (OutputStream flowFileOutputStream = session.write(flowFile)) {
+                StreamUtils.copy(part.getInputStream(), flowFileOutputStream);
+            }
+            flowFile = saveRequestDetailsAsAttributes(request, session, 
foundSubject, flowFile);
+            flowFile = savePartDetailsAsAttributes(session, part, flowFile, i, 
requestParts.size());
+            flowFileSet.add(flowFile);
         }
-        flowFile = saveRequestDetailsAsAttributes(request, session, 
foundSubject, flowFile);
-        flowFile = savePartDetailsAsAttributes(session, part, flowFile, i, 
requestParts.size());
-        flowFileSet.add(flowFile);
-      }
-      return flowFileSet;
+        return flowFileSet;
     }
 
-    private FlowFile savePartDetailsAsAttributes(ProcessSession session, Part 
part, FlowFile flowFile, final int i, final int allPartsCount) {
-      Map<String, String> attributes = new HashMap<>();
-      for (String headerName : part.getHeaderNames()) {
-        final String headerValue = part.getHeader(headerName);
-        putAttribute(attributes, "http.headers.multipart." + headerName, 
headerValue);
-      }
-      putAttribute(attributes, "http.multipart.size", part.getSize());
-      putAttribute(attributes, "http.multipart.content.type", 
part.getContentType());
-      putAttribute(attributes, "http.multipart.name", part.getName());
-      putAttribute(attributes, "http.multipart.filename", 
part.getSubmittedFileName());
-      putAttribute(attributes, "http.multipart.fragments.sequence.number", 
i+1);
-      putAttribute(attributes, "http.multipart.fragments.total.number", 
allPartsCount);
-      return session.putAllAttributes(flowFile, attributes);
+    private FlowFile savePartDetailsAsAttributes(final ProcessSession session, 
final Part part, final FlowFile flowFile, final int sequenceNumber, final int 
allPartsCount) {
+        final Map<String, String> attributes = new HashMap<>();
+        for (String headerName : part.getHeaderNames()) {
+            final String headerValue = part.getHeader(headerName);
+            putAttribute(attributes, "http.headers.multipart." + headerName, 
headerValue);
+        }
+        putAttribute(attributes, "http.multipart.size", part.getSize());
+        putAttribute(attributes, "http.multipart.content.type", 
part.getContentType());
+        putAttribute(attributes, "http.multipart.name", part.getName());
+        putAttribute(attributes, "http.multipart.filename", 
part.getSubmittedFileName());
+        putAttribute(attributes, "http.multipart.fragments.sequence.number", 
sequenceNumber + 1);
+        putAttribute(attributes, "http.multipart.fragments.total.number", 
allPartsCount);
+        return session.putAllAttributes(flowFile, attributes);
     }
 
     private Set<FlowFile> handleRequest(final HttpServletRequest request, 
final ProcessSession session,
-        String foundSubject, final boolean destinationIsLegacyNiFi, final 
String contentType, final InputStream in) {
-      FlowFile flowFile = null;
-      String holdUuid = null;
-      final AtomicBoolean hasMoreData = new AtomicBoolean(false);
-      final FlowFileUnpackager unpackager;
-      if (APPLICATION_FLOW_FILE_V3.equals(contentType)) {
-          unpackager = new FlowFileUnpackagerV3();
-      } else if (APPLICATION_FLOW_FILE_V2.equals(contentType)) {
-          unpackager = new FlowFileUnpackagerV2();
-      } else if (APPLICATION_FLOW_FILE_V1.equals(contentType)) {
-          unpackager = new FlowFileUnpackagerV1();
-      } else {
-          unpackager = null;
-      }
-
-      final Set<FlowFile> flowFileSet = new HashSet<>();
-
-      do {
-          final long startNanos = System.nanoTime();
-          final Map<String, String> attributes = new HashMap<>();
-          flowFile = session.create();
-          flowFile = session.write(flowFile, new OutputStreamCallback() {
-              @Override
-              public void process(final OutputStream rawOut) throws 
IOException {
-                  try (final BufferedOutputStream bos = new 
BufferedOutputStream(rawOut, 65536)) {
-                      if (unpackager == null) {
-                          IOUtils.copy(in, bos);
-                          hasMoreData.set(false);
-                      } else {
-                          attributes.putAll(unpackager.unpackageFlowFile(in, 
bos));
-
-                          if (destinationIsLegacyNiFi) {
-                              if (attributes.containsKey("nf.file.name")) {
-                                  // for backward compatibility with old 
nifi...
-                                  
attributes.put(CoreAttributes.FILENAME.key(), 
attributes.remove("nf.file.name"));
-                              }
-
-                              if (attributes.containsKey("nf.file.path")) {
-                                  attributes.put(CoreAttributes.PATH.key(), 
attributes.remove("nf.file.path"));
-                              }
-                          }
-
-                          hasMoreData.set(unpackager.hasMoreData());
-                      }
-                  }
-              }
-          });
-
-          final long transferNanos = System.nanoTime() - startNanos;
-          final long transferMillis = 
TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
-
-          // put metadata on flowfile
-          final String nameVal = 
request.getHeader(CoreAttributes.FILENAME.key());
-          if (StringUtils.isNotBlank(nameVal)) {
-              attributes.put(CoreAttributes.FILENAME.key(), nameVal);
-          }
-
-          String sourceSystemFlowFileIdentifier = 
attributes.get(CoreAttributes.UUID.key());
-          if (sourceSystemFlowFileIdentifier != null) {
-              sourceSystemFlowFileIdentifier = "urn:nifi:" + 
sourceSystemFlowFileIdentifier;
-
-              // If we receveied a UUID, we want to give the FlowFile a new 
UUID and register the sending system's
-              // identifier as the SourceSystemFlowFileIdentifier field in the 
Provenance RECEIVE event
-              attributes.put(CoreAttributes.UUID.key(), 
UUID.randomUUID().toString());
-          }
-
-          flowFile = session.putAllAttributes(flowFile, attributes);
-          flowFile = saveRequestDetailsAsAttributes(request, session, 
foundSubject, flowFile);
-          session.getProvenanceReporter().receive(flowFile, 
request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote 
DN=" + foundSubject, transferMillis);
-          flowFileSet.add(flowFile);
-
-          if (holdUuid == null) {
-              holdUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
-          }
-      } while (hasMoreData.get());
-      return flowFileSet;
+                                        String foundSubject, final boolean 
destinationIsLegacyNiFi, final String contentType, final InputStream in) {
+        FlowFile flowFile = null;
+        String holdUuid = null;
+        final AtomicBoolean hasMoreData = new AtomicBoolean(false);
+        final FlowFileUnpackager unpackager;
+        if (APPLICATION_FLOW_FILE_V3.equals(contentType)) {
+            unpackager = new FlowFileUnpackagerV3();
+        } else if (APPLICATION_FLOW_FILE_V2.equals(contentType)) {
+            unpackager = new FlowFileUnpackagerV2();
+        } else if (APPLICATION_FLOW_FILE_V1.equals(contentType)) {
+            unpackager = new FlowFileUnpackagerV1();
+        } else {
+            unpackager = null;
+        }
+
+        final Set<FlowFile> flowFileSet = new HashSet<>();
+
+        do {
+            final long startNanos = System.nanoTime();
+            final Map<String, String> attributes = new HashMap<>();
+            flowFile = session.create();
+            flowFile = session.write(flowFile, new OutputStreamCallback() {
+                @Override
+                public void process(final OutputStream rawOut) throws 
IOException {
+                    try (final BufferedOutputStream bos = new 
BufferedOutputStream(rawOut, 65536)) {
+                        if (unpackager == null) {
+                            IOUtils.copy(in, bos);
+                            hasMoreData.set(false);
+                        } else {
+                            attributes.putAll(unpackager.unpackageFlowFile(in, 
bos));
+
+                            if (destinationIsLegacyNiFi) {
+                                if (attributes.containsKey("nf.file.name")) {
+                                    // for backward compatibility with old 
nifi...
+                                    
attributes.put(CoreAttributes.FILENAME.key(), 
attributes.remove("nf.file.name"));
+                                }
+
+                                if (attributes.containsKey("nf.file.path")) {
+                                    attributes.put(CoreAttributes.PATH.key(), 
attributes.remove("nf.file.path"));
+                                }
+                            }
+
+                            hasMoreData.set(unpackager.hasMoreData());
+                        }
+                    }
+                }
+            });
+
+            final long transferNanos = System.nanoTime() - startNanos;
+            final long transferMillis = 
TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+
+            // put metadata on flowfile
+            final String nameVal = 
request.getHeader(CoreAttributes.FILENAME.key());
+            if (StringUtils.isNotBlank(nameVal)) {
+                attributes.put(CoreAttributes.FILENAME.key(), nameVal);
+            }
+
+            String sourceSystemFlowFileIdentifier = 
attributes.get(CoreAttributes.UUID.key());
+            if (sourceSystemFlowFileIdentifier != null) {
+                sourceSystemFlowFileIdentifier = "urn:nifi:" + 
sourceSystemFlowFileIdentifier;
+
+                // If we receveied a UUID, we want to give the FlowFile a new 
UUID and register the sending system's
+                // identifier as the SourceSystemFlowFileIdentifier field in 
the Provenance RECEIVE event
+                attributes.put(CoreAttributes.UUID.key(), 
UUID.randomUUID().toString());
+            }
+
+            flowFile = session.putAllAttributes(flowFile, attributes);
+            flowFile = saveRequestDetailsAsAttributes(request, session, 
foundSubject, flowFile);
+            session.getProvenanceReporter().receive(flowFile, 
request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote 
DN=" + foundSubject, transferMillis);
+            flowFileSet.add(flowFile);
+
+            if (holdUuid == null) {
+                holdUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
+            }
+        } while (hasMoreData.get());
+        return flowFileSet;
     }
 
     protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest 
request, final ProcessSession session,
-        String foundSubject, FlowFile flowFile) {
-      Map<String, String> attributes = new HashMap<>();
-      addMatchingRequestHeaders(request, attributes);
-      flowFile = session.putAllAttributes(flowFile, attributes);
-      flowFile = session.putAttribute(flowFile, 
"restlistener.remote.source.host", request.getRemoteHost());
-      flowFile = session.putAttribute(flowFile, "restlistener.request.uri", 
request.getRequestURI());
-      flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", 
foundSubject);
-      return flowFile;
+                                                      String foundSubject, 
FlowFile flowFile) {
+        Map<String, String> attributes = new HashMap<>();
+        addMatchingRequestHeaders(request, attributes);
+        flowFile = session.putAllAttributes(flowFile, attributes);
+        flowFile = session.putAttribute(flowFile, 
"restlistener.remote.source.host", request.getRemoteHost());
+        flowFile = session.putAttribute(flowFile, "restlistener.request.uri", 
request.getRequestURI());
+        flowFile = session.putAttribute(flowFile, 
"restlistener.remote.user.dn", foundSubject);
+        return flowFile;
     }
 
     private void addMatchingRequestHeaders(final HttpServletRequest request, 
final Map<String, String> attributes) {
-      // put arbitrary headers on flow file
-      for (Enumeration<String> headerEnum = request.getHeaderNames();
-              headerEnum.hasMoreElements();) {
-          String headerName = headerEnum.nextElement();
-          if (headerPattern != null && 
headerPattern.matcher(headerName).matches()) {
-              String headerValue = request.getHeader(headerName);
-              attributes.put(headerName, headerValue);
-          }
-      }
+        // put arbitrary headers on flow file
+        for (Enumeration<String> headerEnum = request.getHeaderNames();
+             headerEnum.hasMoreElements(); ) {
+            String headerName = headerEnum.nextElement();
+            if (headerPattern != null && 
headerPattern.matcher(headerName).matches()) {
+                String headerValue = request.getHeader(headerName);
+                attributes.put(headerName, headerValue);
+            }
+        }
     }
 
     protected void proceedFlow(final HttpServletRequest request, final 
HttpServletResponse response,
-        final ProcessSession session, String foundSubject, final boolean 
createHold,
-        final Set<FlowFile> flowFileSet) throws IOException, 
UnsupportedEncodingException {
-      if (createHold) {
-          String uuid = UUID.randomUUID().toString();
-
-          if (flowFileMap.containsKey(uuid)) {
-              uuid = UUID.randomUUID().toString();
-          }
-
-          final FlowFileEntryTimeWrapper wrapper = new 
FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis(), 
request.getRemoteHost());
-          FlowFileEntryTimeWrapper previousWrapper;
-          do {
-            previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper);
-              if (previousWrapper != null) {
-                  uuid = UUID.randomUUID().toString();
-              }
-          } while (previousWrapper != null);
-
-          response.setStatus(HttpServletResponse.SC_SEE_OTHER);
-          final String ackUri =  "/" + basePath + "/holds/" + uuid;
-          response.addHeader(LOCATION_HEADER_NAME, ackUri);
-          response.addHeader(LOCATION_URI_INTENT_NAME, 
LOCATION_URI_INTENT_VALUE);
-          response.getOutputStream().write(ackUri.getBytes("UTF-8"));
-          if (logger.isDebugEnabled()) {
-              logger.debug("Ingested {} from Remote Host: [{}] Port [{}] 
SubjectDN [{}]; placed hold on these {} files with ID {}",
-                      new Object[]{flowFileSet, request.getRemoteHost(), 
request.getRemotePort(), foundSubject, flowFileSet.size(), uuid});
-          }
-      } else {
-          response.setStatus(this.returnCode);
-          logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN 
[{}]; transferring to 'success'",
-                  new Object[]{request.getRemoteHost(), 
request.getRemotePort(), foundSubject});
-
-          session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS);
-          session.commit();
-      }
+                               final ProcessSession session, String 
foundSubject, final boolean createHold,
+                               final Set<FlowFile> flowFileSet) throws 
IOException, UnsupportedEncodingException {
+        if (createHold) {
+            String uuid = UUID.randomUUID().toString();
+
+            if (flowFileMap.containsKey(uuid)) {
+                uuid = UUID.randomUUID().toString();
+            }
+
+            final FlowFileEntryTimeWrapper wrapper = new 
FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis(), 
request.getRemoteHost());
+            FlowFileEntryTimeWrapper previousWrapper;
+            do {
+                previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper);
+                if (previousWrapper != null) {
+                    uuid = UUID.randomUUID().toString();
+                }
+            } while (previousWrapper != null);
+
+            response.setStatus(HttpServletResponse.SC_SEE_OTHER);
+            final String ackUri = "/" + basePath + "/holds/" + uuid;
+            response.addHeader(LOCATION_HEADER_NAME, ackUri);
+            response.addHeader(LOCATION_URI_INTENT_NAME, 
LOCATION_URI_INTENT_VALUE);
+            response.getOutputStream().write(ackUri.getBytes("UTF-8"));
+            if (logger.isDebugEnabled()) {
+                logger.debug("Ingested {} from Remote Host: [{}] Port [{}] 
SubjectDN [{}]; placed hold on these {} files with ID {}",
+                    new Object[]{flowFileSet, request.getRemoteHost(), 
request.getRemotePort(), foundSubject, flowFileSet.size(), uuid});
+            }
+        } else {
+            response.setStatus(this.returnCode);
+            logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN 
[{}]; transferring to 'success'",
+                new Object[]{request.getRemoteHost(), request.getRemotePort(), 
foundSubject});
+
+            session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS);
+            session.commit();
+        }
     }
 
     private void putAttribute(final Map<String, String> map, final String key, 
final Object value) {
-      if (value == null) {
-          return;
-      }
+        if (value == null) {
+            return;
+        }
 
-      putAttribute(map, key, value.toString());
-  }
+        putAttribute(map, key, value.toString());
+    }
 
     private void putAttribute(final Map<String, String> map, final String key, 
final String value) {
-      if (value == null) {
-          return;
-      }
+        if (value == null) {
+            return;
+        }
 
-      map.put(key, value);
-  }
+        map.put(key, value);
+    }
 }

Reply via email to