NIFI-1490: better field naming / displayname and description mix up fix This closes #2994.
Signed-off-by: Mark Payne <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5aa42635 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5aa42635 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5aa42635 Branch: refs/heads/master Commit: 5aa426358802bfac91657fdb8a8a83094239ced8 Parents: c81a135 Author: Endre Zoltan Kovacs <[email protected]> Authored: Mon Oct 8 13:10:37 2018 +0200 Committer: Mark Payne <[email protected]> Committed: Thu Oct 11 15:41:38 2018 -0400 ---------------------------------------------------------------------- .../nifi/processors/standard/ListenHTTP.java | 30 +- .../standard/servlets/ListenHTTPServlet.java | 420 +++++++++---------- 2 files changed, 223 insertions(+), 227 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa42635/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java index 39c91db..5ea9f3a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java @@ -134,21 +134,21 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { .defaultValue(String.valueOf(HttpServletResponse.SC_OK)) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .build(); - public static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder() - .name("max-request-size") - .displayName("Max Request Size") + public static final PropertyDescriptor MULTIPART_REQUEST_MAX_SIZE = new PropertyDescriptor.Builder() + .name("multipart-request-max-size") + .displayName("Multipart Request Max Size") .description("The max size of the request. Only applies for requests with Content-Type: multipart/form-data, " + "and is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space") .required(true) .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) .defaultValue("1 MB") .build(); - public static final PropertyDescriptor IN_MEMORY_FILE_SIZE_THRESHOLD = new PropertyDescriptor.Builder() - .name("in-memory-file-size-threshold") - .displayName("The threshold size, at which the contents of an incoming file would be written to disk. " + public static final PropertyDescriptor MULTIPART_READ_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("multipart-read-buffer-size") + .displayName("Multipart Read Buffer Size") + .description("The threshold size, at which the contents of an incoming file would be written to disk. " + "Only applies for requests with Content-Type: multipart/form-data. " + "It is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space.") - .description("The threshold value for writing a file to disk.") .required(true) .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) .defaultValue("512 KB") @@ -164,8 +164,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler"; public static final String CONTEXT_ATTRIBUTE_BASE_PATH = "basePath"; public static final String CONTEXT_ATTRIBUTE_RETURN_CODE = "returnCode"; - public static final String CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE = "maxRequestSize"; - public static final String CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD = "inMemoryFileSizeThreshold"; + public static final String CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE = "multipartRequestMaxSize"; + public static final String CONTEXT_ATTRIBUTE_MULTIPART_READ_BUFFER_SIZE = "multipartReadBufferSize"; private volatile Server server = null; private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<>(); @@ -187,8 +187,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { descriptors.add(MAX_UNCONFIRMED_TIME); descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX); descriptors.add(RETURN_CODE); - descriptors.add(MAX_REQUEST_SIZE); - descriptors.add(IN_MEMORY_FILE_SIZE_THRESHOLD); + descriptors.add(MULTIPART_REQUEST_MAX_SIZE); + descriptors.add(MULTIPART_READ_BUFFER_SIZE); this.properties = Collections.unmodifiableList(descriptors); } @@ -237,8 +237,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B); final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue()); final int returnCode = context.getProperty(RETURN_CODE).asInteger(); - long maxRequestSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).longValue(); - int inMemoryFileSizeThreshold = context.getProperty(IN_MEMORY_FILE_SIZE_THRESHOLD).asDataSize(DataUnit.B).intValue(); + long requestMaxSize = context.getProperty(MULTIPART_REQUEST_MAX_SIZE).asDataSize(DataUnit.B).longValue(); + int readBufferSize = context.getProperty(MULTIPART_READ_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); throttlerRef.set(streamThrottler); final boolean needClientAuth = sslContextService != null && sslContextService.getTrustStoreFile() != null; @@ -321,8 +321,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_BASE_PATH, basePath); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_RETURN_CODE, returnCode); - contextHandler.setAttribute(CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE, maxRequestSize); - contextHandler.setAttribute(CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD, inMemoryFileSizeThreshold); + contextHandler.setAttribute(CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE, requestMaxSize); + contextHandler.setAttribute(CONTEXT_ATTRIBUTE_MULTIPART_READ_BUFFER_SIZE, readBufferSize); if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) { contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue())); http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa42635/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java index 8803f0b..07ccd69 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java @@ -16,6 +16,37 @@ */ package org.apache.nifi.processors.standard.servlets; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processors.standard.ListenHTTP; +import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper; +import org.apache.nifi.stream.io.StreamThrottler; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.FlowFileUnpackager; +import org.apache.nifi.util.FlowFileUnpackagerV1; +import org.apache.nifi.util.FlowFileUnpackagerV2; +import org.apache.nifi.util.FlowFileUnpackagerV3; +import org.eclipse.jetty.server.Request; + +import javax.servlet.MultipartConfigElement; +import javax.servlet.ServletConfig; +import javax.servlet.ServletContext; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.servlet.http.Part; +import javax.ws.rs.Path; +import javax.ws.rs.core.MediaType; import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; @@ -37,40 +68,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import java.util.zip.GZIPInputStream; -import javax.servlet.MultipartConfigElement; -import javax.servlet.ServletConfig; -import javax.servlet.ServletContext; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import javax.servlet.http.Part; -import javax.ws.rs.Path; -import javax.ws.rs.core.MediaType; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessSessionFactory; -import org.apache.nifi.processor.io.OutputStreamCallback; -import org.apache.nifi.processors.standard.ListenHTTP; -import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper; -import org.apache.nifi.stream.io.StreamThrottler; -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.util.FlowFileUnpackager; -import org.apache.nifi.util.FlowFileUnpackagerV1; -import org.apache.nifi.util.FlowFileUnpackagerV2; -import org.apache.nifi.util.FlowFileUnpackagerV3; -import org.eclipse.jetty.server.Request; - -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; - - @Path("") public class ListenHTTPServlet extends HttpServlet { @@ -105,8 +102,8 @@ public class ListenHTTPServlet extends HttpServlet { private StreamThrottler streamThrottler; private String basePath; private int returnCode; - private long maxRequestSize; - private int inMemoryFileSizeThreshold; + private long multipartRequestMaxSize; + private int multipartReadBufferSize; @SuppressWarnings("unchecked") @Override @@ -121,8 +118,8 @@ public class ListenHTTPServlet extends HttpServlet { this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER); this.basePath = (String) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_BASE_PATH); this.returnCode = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_RETURN_CODE); - this.maxRequestSize = (long) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE); - this.inMemoryFileSizeThreshold = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD); + this.multipartRequestMaxSize = (long) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE); + this.multipartReadBufferSize = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MULTIPART_READ_BUFFER_SIZE); } @Override @@ -204,11 +201,11 @@ public class ListenHTTPServlet extends HttpServlet { logger.debug("Received request from " + request.getRemoteHost() + ", createHold=" + createHold + ", content-type=" + contentType + ", gzip=" + contentGzipped); } - Set<FlowFile> flowFileSet = null; + Set<FlowFile> flowFileSet; if (!Strings.isNullOrEmpty(request.getContentType()) && request.getContentType().contains("multipart/form-data")) { - flowFileSet = handleMultipartRequest(request, session, foundSubject, destinationIsLegacyNiFi, contentType, in); + flowFileSet = handleMultipartRequest(request, session, foundSubject); } else { - flowFileSet = handleRequest(request, session, foundSubject, destinationIsLegacyNiFi, contentType, in); + flowFileSet = handleRequest(request, session, foundSubject, destinationIsLegacyNiFi, contentType, in); } proceedFlow(request, response, session, foundSubject, createHold, flowFileSet); } catch (final Throwable t) { @@ -217,199 +214,198 @@ public class ListenHTTPServlet extends HttpServlet { } private void handleException(final HttpServletRequest request, final HttpServletResponse response, - final ProcessSession session, String foundSubject, final Throwable t) throws IOException { - session.rollback(); - logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{request.getRemoteHost(), foundSubject, t}); - response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString()); + final ProcessSession session, String foundSubject, final Throwable t) throws IOException { + session.rollback(); + logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{request.getRemoteHost(), foundSubject, t}); + response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString()); } - private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, ProcessSession session, String foundSubject, - boolean destinationIsLegacyNiFi, String contentType, InputStream in) throws IOException, ServletException { - Set<FlowFile> flowFileSet = new HashSet<>(); - String tempDir = System.getProperty("java.io.tmpdir"); - request.setAttribute(Request.__MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, maxRequestSize, maxRequestSize, inMemoryFileSizeThreshold)); - List<Part> requestParts = ImmutableList.copyOf(request.getParts()); - for (int i = 0; i < requestParts.size(); i++) { - Part part = requestParts.get(i); - FlowFile flowFile = session.create(); - try (OutputStream flowFileOoutputStream = session.write(flowFile)) { - StreamUtils.copy(part.getInputStream(), flowFileOoutputStream); + private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, ProcessSession session, String foundSubject) throws IOException, IllegalStateException, ServletException { + Set<FlowFile> flowFileSet = new HashSet<>(); + String tempDir = System.getProperty("java.io.tmpdir"); + request.setAttribute(Request.__MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, multipartRequestMaxSize, multipartRequestMaxSize, multipartReadBufferSize)); + List<Part> requestParts = ImmutableList.copyOf(request.getParts()); + for (int i = 0; i < requestParts.size(); i++) { + Part part = requestParts.get(i); + FlowFile flowFile = session.create(); + try (OutputStream flowFileOutputStream = session.write(flowFile)) { + StreamUtils.copy(part.getInputStream(), flowFileOutputStream); + } + flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, flowFile); + flowFile = savePartDetailsAsAttributes(session, part, flowFile, i, requestParts.size()); + flowFileSet.add(flowFile); } - flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, flowFile); - flowFile = savePartDetailsAsAttributes(session, part, flowFile, i, requestParts.size()); - flowFileSet.add(flowFile); - } - return flowFileSet; + return flowFileSet; } - private FlowFile savePartDetailsAsAttributes(ProcessSession session, Part part, FlowFile flowFile, final int i, final int allPartsCount) { - Map<String, String> attributes = new HashMap<>(); - for (String headerName : part.getHeaderNames()) { - final String headerValue = part.getHeader(headerName); - putAttribute(attributes, "http.headers.multipart." + headerName, headerValue); - } - putAttribute(attributes, "http.multipart.size", part.getSize()); - putAttribute(attributes, "http.multipart.content.type", part.getContentType()); - putAttribute(attributes, "http.multipart.name", part.getName()); - putAttribute(attributes, "http.multipart.filename", part.getSubmittedFileName()); - putAttribute(attributes, "http.multipart.fragments.sequence.number", i+1); - putAttribute(attributes, "http.multipart.fragments.total.number", allPartsCount); - return session.putAllAttributes(flowFile, attributes); + private FlowFile savePartDetailsAsAttributes(final ProcessSession session, final Part part, final FlowFile flowFile, final int sequenceNumber, final int allPartsCount) { + final Map<String, String> attributes = new HashMap<>(); + for (String headerName : part.getHeaderNames()) { + final String headerValue = part.getHeader(headerName); + putAttribute(attributes, "http.headers.multipart." + headerName, headerValue); + } + putAttribute(attributes, "http.multipart.size", part.getSize()); + putAttribute(attributes, "http.multipart.content.type", part.getContentType()); + putAttribute(attributes, "http.multipart.name", part.getName()); + putAttribute(attributes, "http.multipart.filename", part.getSubmittedFileName()); + putAttribute(attributes, "http.multipart.fragments.sequence.number", sequenceNumber + 1); + putAttribute(attributes, "http.multipart.fragments.total.number", allPartsCount); + return session.putAllAttributes(flowFile, attributes); } private Set<FlowFile> handleRequest(final HttpServletRequest request, final ProcessSession session, - String foundSubject, final boolean destinationIsLegacyNiFi, final String contentType, final InputStream in) { - FlowFile flowFile = null; - String holdUuid = null; - final AtomicBoolean hasMoreData = new AtomicBoolean(false); - final FlowFileUnpackager unpackager; - if (APPLICATION_FLOW_FILE_V3.equals(contentType)) { - unpackager = new FlowFileUnpackagerV3(); - } else if (APPLICATION_FLOW_FILE_V2.equals(contentType)) { - unpackager = new FlowFileUnpackagerV2(); - } else if (APPLICATION_FLOW_FILE_V1.equals(contentType)) { - unpackager = new FlowFileUnpackagerV1(); - } else { - unpackager = null; - } - - final Set<FlowFile> flowFileSet = new HashSet<>(); - - do { - final long startNanos = System.nanoTime(); - final Map<String, String> attributes = new HashMap<>(); - flowFile = session.create(); - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream rawOut) throws IOException { - try (final BufferedOutputStream bos = new BufferedOutputStream(rawOut, 65536)) { - if (unpackager == null) { - IOUtils.copy(in, bos); - hasMoreData.set(false); - } else { - attributes.putAll(unpackager.unpackageFlowFile(in, bos)); - - if (destinationIsLegacyNiFi) { - if (attributes.containsKey("nf.file.name")) { - // for backward compatibility with old nifi... - attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name")); - } - - if (attributes.containsKey("nf.file.path")) { - attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path")); - } - } - - hasMoreData.set(unpackager.hasMoreData()); - } - } - } - }); - - final long transferNanos = System.nanoTime() - startNanos; - final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); - - // put metadata on flowfile - final String nameVal = request.getHeader(CoreAttributes.FILENAME.key()); - if (StringUtils.isNotBlank(nameVal)) { - attributes.put(CoreAttributes.FILENAME.key(), nameVal); - } - - String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key()); - if (sourceSystemFlowFileIdentifier != null) { - sourceSystemFlowFileIdentifier = "urn:nifi:" + sourceSystemFlowFileIdentifier; - - // If we receveied a UUID, we want to give the FlowFile a new UUID and register the sending system's - // identifier as the SourceSystemFlowFileIdentifier field in the Provenance RECEIVE event - attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); - } - - flowFile = session.putAllAttributes(flowFile, attributes); - flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, flowFile); - session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote DN=" + foundSubject, transferMillis); - flowFileSet.add(flowFile); - - if (holdUuid == null) { - holdUuid = flowFile.getAttribute(CoreAttributes.UUID.key()); - } - } while (hasMoreData.get()); - return flowFileSet; + String foundSubject, final boolean destinationIsLegacyNiFi, final String contentType, final InputStream in) { + FlowFile flowFile = null; + String holdUuid = null; + final AtomicBoolean hasMoreData = new AtomicBoolean(false); + final FlowFileUnpackager unpackager; + if (APPLICATION_FLOW_FILE_V3.equals(contentType)) { + unpackager = new FlowFileUnpackagerV3(); + } else if (APPLICATION_FLOW_FILE_V2.equals(contentType)) { + unpackager = new FlowFileUnpackagerV2(); + } else if (APPLICATION_FLOW_FILE_V1.equals(contentType)) { + unpackager = new FlowFileUnpackagerV1(); + } else { + unpackager = null; + } + + final Set<FlowFile> flowFileSet = new HashSet<>(); + + do { + final long startNanos = System.nanoTime(); + final Map<String, String> attributes = new HashMap<>(); + flowFile = session.create(); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final BufferedOutputStream bos = new BufferedOutputStream(rawOut, 65536)) { + if (unpackager == null) { + IOUtils.copy(in, bos); + hasMoreData.set(false); + } else { + attributes.putAll(unpackager.unpackageFlowFile(in, bos)); + + if (destinationIsLegacyNiFi) { + if (attributes.containsKey("nf.file.name")) { + // for backward compatibility with old nifi... + attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name")); + } + + if (attributes.containsKey("nf.file.path")) { + attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path")); + } + } + + hasMoreData.set(unpackager.hasMoreData()); + } + } + } + }); + + final long transferNanos = System.nanoTime() - startNanos; + final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); + + // put metadata on flowfile + final String nameVal = request.getHeader(CoreAttributes.FILENAME.key()); + if (StringUtils.isNotBlank(nameVal)) { + attributes.put(CoreAttributes.FILENAME.key(), nameVal); + } + + String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key()); + if (sourceSystemFlowFileIdentifier != null) { + sourceSystemFlowFileIdentifier = "urn:nifi:" + sourceSystemFlowFileIdentifier; + + // If we receveied a UUID, we want to give the FlowFile a new UUID and register the sending system's + // identifier as the SourceSystemFlowFileIdentifier field in the Provenance RECEIVE event + attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); + } + + flowFile = session.putAllAttributes(flowFile, attributes); + flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, flowFile); + session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote DN=" + foundSubject, transferMillis); + flowFileSet.add(flowFile); + + if (holdUuid == null) { + holdUuid = flowFile.getAttribute(CoreAttributes.UUID.key()); + } + } while (hasMoreData.get()); + return flowFileSet; } protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest request, final ProcessSession session, - String foundSubject, FlowFile flowFile) { - Map<String, String> attributes = new HashMap<>(); - addMatchingRequestHeaders(request, attributes); - flowFile = session.putAllAttributes(flowFile, attributes); - flowFile = session.putAttribute(flowFile, "restlistener.remote.source.host", request.getRemoteHost()); - flowFile = session.putAttribute(flowFile, "restlistener.request.uri", request.getRequestURI()); - flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject); - return flowFile; + String foundSubject, FlowFile flowFile) { + Map<String, String> attributes = new HashMap<>(); + addMatchingRequestHeaders(request, attributes); + flowFile = session.putAllAttributes(flowFile, attributes); + flowFile = session.putAttribute(flowFile, "restlistener.remote.source.host", request.getRemoteHost()); + flowFile = session.putAttribute(flowFile, "restlistener.request.uri", request.getRequestURI()); + flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject); + return flowFile; } private void addMatchingRequestHeaders(final HttpServletRequest request, final Map<String, String> attributes) { - // put arbitrary headers on flow file - for (Enumeration<String> headerEnum = request.getHeaderNames(); - headerEnum.hasMoreElements();) { - String headerName = headerEnum.nextElement(); - if (headerPattern != null && headerPattern.matcher(headerName).matches()) { - String headerValue = request.getHeader(headerName); - attributes.put(headerName, headerValue); - } - } + // put arbitrary headers on flow file + for (Enumeration<String> headerEnum = request.getHeaderNames(); + headerEnum.hasMoreElements(); ) { + String headerName = headerEnum.nextElement(); + if (headerPattern != null && headerPattern.matcher(headerName).matches()) { + String headerValue = request.getHeader(headerName); + attributes.put(headerName, headerValue); + } + } } protected void proceedFlow(final HttpServletRequest request, final HttpServletResponse response, - final ProcessSession session, String foundSubject, final boolean createHold, - final Set<FlowFile> flowFileSet) throws IOException, UnsupportedEncodingException { - if (createHold) { - String uuid = UUID.randomUUID().toString(); - - if (flowFileMap.containsKey(uuid)) { - uuid = UUID.randomUUID().toString(); - } - - final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis(), request.getRemoteHost()); - FlowFileEntryTimeWrapper previousWrapper; - do { - previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper); - if (previousWrapper != null) { - uuid = UUID.randomUUID().toString(); - } - } while (previousWrapper != null); - - response.setStatus(HttpServletResponse.SC_SEE_OTHER); - final String ackUri = "/" + basePath + "/holds/" + uuid; - response.addHeader(LOCATION_HEADER_NAME, ackUri); - response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE); - response.getOutputStream().write(ackUri.getBytes("UTF-8")); - if (logger.isDebugEnabled()) { - logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}]; placed hold on these {} files with ID {}", - new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet.size(), uuid}); - } - } else { - response.setStatus(this.returnCode); - logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success'", - new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject}); - - session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS); - session.commit(); - } + final ProcessSession session, String foundSubject, final boolean createHold, + final Set<FlowFile> flowFileSet) throws IOException, UnsupportedEncodingException { + if (createHold) { + String uuid = UUID.randomUUID().toString(); + + if (flowFileMap.containsKey(uuid)) { + uuid = UUID.randomUUID().toString(); + } + + final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis(), request.getRemoteHost()); + FlowFileEntryTimeWrapper previousWrapper; + do { + previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper); + if (previousWrapper != null) { + uuid = UUID.randomUUID().toString(); + } + } while (previousWrapper != null); + + response.setStatus(HttpServletResponse.SC_SEE_OTHER); + final String ackUri = "/" + basePath + "/holds/" + uuid; + response.addHeader(LOCATION_HEADER_NAME, ackUri); + response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE); + response.getOutputStream().write(ackUri.getBytes("UTF-8")); + if (logger.isDebugEnabled()) { + logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}]; placed hold on these {} files with ID {}", + new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet.size(), uuid}); + } + } else { + response.setStatus(this.returnCode); + logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success'", + new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject}); + + session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS); + session.commit(); + } } private void putAttribute(final Map<String, String> map, final String key, final Object value) { - if (value == null) { - return; - } + if (value == null) { + return; + } - putAttribute(map, key, value.toString()); - } + putAttribute(map, key, value.toString()); + } private void putAttribute(final Map<String, String> map, final String key, final String value) { - if (value == null) { - return; - } + if (value == null) { + return; + } - map.put(key, value); - } + map.put(key, value); + } }
