Repository: nifi Updated Branches: refs/heads/master 8398ea77b -> 5aa426358
NIFI-1490: multipart/form-data support for ListenHTTP processor - introducing a in-memory-file-size-threashold, above which the incoming file is written to local disk - using java.io.tmpdir for such file writes - enhancing documentation Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c81a1351 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c81a1351 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c81a1351 Branch: refs/heads/master Commit: c81a135161c7af106117dcc616706798c4564669 Parents: 8398ea7 Author: Endre Zoltan Kovacs <[email protected]> Authored: Thu Sep 6 17:33:33 2018 +0200 Committer: Mark Payne <[email protected]> Committed: Thu Oct 11 15:41:19 2018 -0400 ---------------------------------------------------------------------- .../nifi/processors/standard/ListenHTTP.java | 29 +- .../standard/servlets/ListenHTTPServlet.java | 329 ++++++++++++------- .../processors/standard/TestListenHTTP.java | 174 ++++++++-- 3 files changed, 392 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/c81a1351/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java index 209e6d1..39c91db 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java @@ -134,6 +134,25 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { .defaultValue(String.valueOf(HttpServletResponse.SC_OK)) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .build(); + public static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder() + .name("max-request-size") + .displayName("Max Request Size") + .description("The max size of the request. Only applies for requests with Content-Type: multipart/form-data, " + + "and is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .build(); + public static final PropertyDescriptor IN_MEMORY_FILE_SIZE_THRESHOLD = new PropertyDescriptor.Builder() + .name("in-memory-file-size-threshold") + .displayName("The threshold size, at which the contents of an incoming file would be written to disk. " + + "Only applies for requests with Content-Type: multipart/form-data. " + + "It is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space.") + .description("The threshold value for writing a file to disk.") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("512 KB") + .build(); public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor"; public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger"; @@ -145,6 +164,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler"; public static final String CONTEXT_ATTRIBUTE_BASE_PATH = "basePath"; public static final String CONTEXT_ATTRIBUTE_RETURN_CODE = "returnCode"; + public static final String CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE = "maxRequestSize"; + public static final String CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD = "inMemoryFileSizeThreshold"; private volatile Server server = null; private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<>(); @@ -166,6 +187,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { descriptors.add(MAX_UNCONFIRMED_TIME); descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX); descriptors.add(RETURN_CODE); + descriptors.add(MAX_REQUEST_SIZE); + descriptors.add(IN_MEMORY_FILE_SIZE_THRESHOLD); this.properties = Collections.unmodifiableList(descriptors); } @@ -214,6 +237,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B); final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue()); final int returnCode = context.getProperty(RETURN_CODE).asInteger(); + long maxRequestSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).longValue(); + int inMemoryFileSizeThreshold = context.getProperty(IN_MEMORY_FILE_SIZE_THRESHOLD).asDataSize(DataUnit.B).intValue(); throttlerRef.set(streamThrottler); final boolean needClientAuth = sslContextService != null && sslContextService.getTrustStoreFile() != null; @@ -295,7 +320,9 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue())); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_BASE_PATH, basePath); - contextHandler.setAttribute(CONTEXT_ATTRIBUTE_RETURN_CODE,returnCode); + contextHandler.setAttribute(CONTEXT_ATTRIBUTE_RETURN_CODE, returnCode); + contextHandler.setAttribute(CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE, maxRequestSize); + contextHandler.setAttribute(CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD, inMemoryFileSizeThreshold); if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) { contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue())); http://git-wip-us.apache.org/repos/asf/nifi/blob/c81a1351/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java index 85de6f8..8803f0b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java @@ -20,10 +20,12 @@ import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.UnsupportedEncodingException; import java.security.cert.X509Certificate; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -34,14 +36,18 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import java.util.zip.GZIPInputStream; + +import javax.servlet.MultipartConfigElement; import javax.servlet.ServletConfig; import javax.servlet.ServletContext; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import javax.servlet.http.Part; import javax.ws.rs.Path; import javax.ws.rs.core.MediaType; + import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.flowfile.FlowFile; @@ -54,10 +60,15 @@ import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processors.standard.ListenHTTP; import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper; import org.apache.nifi.stream.io.StreamThrottler; +import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.FlowFileUnpackager; import org.apache.nifi.util.FlowFileUnpackagerV1; import org.apache.nifi.util.FlowFileUnpackagerV2; import org.apache.nifi.util.FlowFileUnpackagerV3; +import org.eclipse.jetty.server.Request; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; @Path("") @@ -94,6 +105,8 @@ public class ListenHTTPServlet extends HttpServlet { private StreamThrottler streamThrottler; private String basePath; private int returnCode; + private long maxRequestSize; + private int inMemoryFileSizeThreshold; @SuppressWarnings("unchecked") @Override @@ -108,6 +121,8 @@ public class ListenHTTPServlet extends HttpServlet { this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER); this.basePath = (String) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_BASE_PATH); this.returnCode = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_RETURN_CODE); + this.maxRequestSize = (long) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MAX_REQUEST_SIZE); + this.inMemoryFileSizeThreshold = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_IN_MEMORY_FILE_SIZE_THRESHOLD); } @Override @@ -133,8 +148,6 @@ public class ListenHTTPServlet extends HttpServlet { } while (sessionFactory == null); final ProcessSession session = sessionFactory.createSession(); - FlowFile flowFile = null; - String holdUuid = null; String foundSubject = null; try { final long n = filesReceived.getAndIncrement() % FILES_BEFORE_CHECKING_DESTINATION_SPACE; @@ -191,134 +204,212 @@ public class ListenHTTPServlet extends HttpServlet { logger.debug("Received request from " + request.getRemoteHost() + ", createHold=" + createHold + ", content-type=" + contentType + ", gzip=" + contentGzipped); } - final AtomicBoolean hasMoreData = new AtomicBoolean(false); - final FlowFileUnpackager unpackager; - if (APPLICATION_FLOW_FILE_V3.equals(contentType)) { - unpackager = new FlowFileUnpackagerV3(); - } else if (APPLICATION_FLOW_FILE_V2.equals(contentType)) { - unpackager = new FlowFileUnpackagerV2(); - } else if (APPLICATION_FLOW_FILE_V1.equals(contentType)) { - unpackager = new FlowFileUnpackagerV1(); + Set<FlowFile> flowFileSet = null; + if (!Strings.isNullOrEmpty(request.getContentType()) && request.getContentType().contains("multipart/form-data")) { + flowFileSet = handleMultipartRequest(request, session, foundSubject, destinationIsLegacyNiFi, contentType, in); } else { - unpackager = null; + flowFileSet = handleRequest(request, session, foundSubject, destinationIsLegacyNiFi, contentType, in); } + proceedFlow(request, response, session, foundSubject, createHold, flowFileSet); + } catch (final Throwable t) { + handleException(request, response, session, foundSubject, t); + } + } - final Set<FlowFile> flowFileSet = new HashSet<>(); - - do { - final long startNanos = System.nanoTime(); - final Map<String, String> attributes = new HashMap<>(); - flowFile = session.create(); - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream rawOut) throws IOException { - try (final BufferedOutputStream bos = new BufferedOutputStream(rawOut, 65536)) { - if (unpackager == null) { - IOUtils.copy(in, bos); - hasMoreData.set(false); - } else { - attributes.putAll(unpackager.unpackageFlowFile(in, bos)); - - if (destinationIsLegacyNiFi) { - if (attributes.containsKey("nf.file.name")) { - // for backward compatibility with old nifi... - attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name")); - } - - if (attributes.containsKey("nf.file.path")) { - attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path")); - } - } - - hasMoreData.set(unpackager.hasMoreData()); - } - } - } - }); - - final long transferNanos = System.nanoTime() - startNanos; - final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); + private void handleException(final HttpServletRequest request, final HttpServletResponse response, + final ProcessSession session, String foundSubject, final Throwable t) throws IOException { + session.rollback(); + logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{request.getRemoteHost(), foundSubject, t}); + response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString()); + } - // put metadata on flowfile - final String nameVal = request.getHeader(CoreAttributes.FILENAME.key()); - if (StringUtils.isNotBlank(nameVal)) { - attributes.put(CoreAttributes.FILENAME.key(), nameVal); - } + private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, ProcessSession session, String foundSubject, + boolean destinationIsLegacyNiFi, String contentType, InputStream in) throws IOException, ServletException { + Set<FlowFile> flowFileSet = new HashSet<>(); + String tempDir = System.getProperty("java.io.tmpdir"); + request.setAttribute(Request.__MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, maxRequestSize, maxRequestSize, inMemoryFileSizeThreshold)); + List<Part> requestParts = ImmutableList.copyOf(request.getParts()); + for (int i = 0; i < requestParts.size(); i++) { + Part part = requestParts.get(i); + FlowFile flowFile = session.create(); + try (OutputStream flowFileOoutputStream = session.write(flowFile)) { + StreamUtils.copy(part.getInputStream(), flowFileOoutputStream); + } + flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, flowFile); + flowFile = savePartDetailsAsAttributes(session, part, flowFile, i, requestParts.size()); + flowFileSet.add(flowFile); + } + return flowFileSet; + } - // put arbitrary headers on flow file - for (Enumeration<String> headerEnum = request.getHeaderNames(); - headerEnum.hasMoreElements();) { - String headerName = headerEnum.nextElement(); - if (headerPattern != null && headerPattern.matcher(headerName).matches()) { - String headerValue = request.getHeader(headerName); - attributes.put(headerName, headerValue); - } - } + private FlowFile savePartDetailsAsAttributes(ProcessSession session, Part part, FlowFile flowFile, final int i, final int allPartsCount) { + Map<String, String> attributes = new HashMap<>(); + for (String headerName : part.getHeaderNames()) { + final String headerValue = part.getHeader(headerName); + putAttribute(attributes, "http.headers.multipart." + headerName, headerValue); + } + putAttribute(attributes, "http.multipart.size", part.getSize()); + putAttribute(attributes, "http.multipart.content.type", part.getContentType()); + putAttribute(attributes, "http.multipart.name", part.getName()); + putAttribute(attributes, "http.multipart.filename", part.getSubmittedFileName()); + putAttribute(attributes, "http.multipart.fragments.sequence.number", i+1); + putAttribute(attributes, "http.multipart.fragments.total.number", allPartsCount); + return session.putAllAttributes(flowFile, attributes); + } - String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key()); - if (sourceSystemFlowFileIdentifier != null) { - sourceSystemFlowFileIdentifier = "urn:nifi:" + sourceSystemFlowFileIdentifier; + private Set<FlowFile> handleRequest(final HttpServletRequest request, final ProcessSession session, + String foundSubject, final boolean destinationIsLegacyNiFi, final String contentType, final InputStream in) { + FlowFile flowFile = null; + String holdUuid = null; + final AtomicBoolean hasMoreData = new AtomicBoolean(false); + final FlowFileUnpackager unpackager; + if (APPLICATION_FLOW_FILE_V3.equals(contentType)) { + unpackager = new FlowFileUnpackagerV3(); + } else if (APPLICATION_FLOW_FILE_V2.equals(contentType)) { + unpackager = new FlowFileUnpackagerV2(); + } else if (APPLICATION_FLOW_FILE_V1.equals(contentType)) { + unpackager = new FlowFileUnpackagerV1(); + } else { + unpackager = null; + } + + final Set<FlowFile> flowFileSet = new HashSet<>(); + + do { + final long startNanos = System.nanoTime(); + final Map<String, String> attributes = new HashMap<>(); + flowFile = session.create(); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final BufferedOutputStream bos = new BufferedOutputStream(rawOut, 65536)) { + if (unpackager == null) { + IOUtils.copy(in, bos); + hasMoreData.set(false); + } else { + attributes.putAll(unpackager.unpackageFlowFile(in, bos)); + + if (destinationIsLegacyNiFi) { + if (attributes.containsKey("nf.file.name")) { + // for backward compatibility with old nifi... + attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name")); + } + + if (attributes.containsKey("nf.file.path")) { + attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path")); + } + } + + hasMoreData.set(unpackager.hasMoreData()); + } + } + } + }); + + final long transferNanos = System.nanoTime() - startNanos; + final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); + + // put metadata on flowfile + final String nameVal = request.getHeader(CoreAttributes.FILENAME.key()); + if (StringUtils.isNotBlank(nameVal)) { + attributes.put(CoreAttributes.FILENAME.key(), nameVal); + } + + String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key()); + if (sourceSystemFlowFileIdentifier != null) { + sourceSystemFlowFileIdentifier = "urn:nifi:" + sourceSystemFlowFileIdentifier; + + // If we receveied a UUID, we want to give the FlowFile a new UUID and register the sending system's + // identifier as the SourceSystemFlowFileIdentifier field in the Provenance RECEIVE event + attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); + } + + flowFile = session.putAllAttributes(flowFile, attributes); + flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, flowFile); + session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote DN=" + foundSubject, transferMillis); + flowFileSet.add(flowFile); + + if (holdUuid == null) { + holdUuid = flowFile.getAttribute(CoreAttributes.UUID.key()); + } + } while (hasMoreData.get()); + return flowFileSet; + } - // If we receveied a UUID, we want to give the FlowFile a new UUID and register the sending system's - // identifier as the SourceSystemFlowFileIdentifier field in the Provenance RECEIVE event - attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); - } + protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest request, final ProcessSession session, + String foundSubject, FlowFile flowFile) { + Map<String, String> attributes = new HashMap<>(); + addMatchingRequestHeaders(request, attributes); + flowFile = session.putAllAttributes(flowFile, attributes); + flowFile = session.putAttribute(flowFile, "restlistener.remote.source.host", request.getRemoteHost()); + flowFile = session.putAttribute(flowFile, "restlistener.request.uri", request.getRequestURI()); + flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject); + return flowFile; + } - flowFile = session.putAllAttributes(flowFile, attributes); - session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote DN=" + foundSubject, transferMillis); - flowFile = session.putAttribute(flowFile, "restlistener.remote.source.host", request.getRemoteHost()); - flowFile = session.putAttribute(flowFile, "restlistener.request.uri", request.getRequestURI()); - flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject); - flowFileSet.add(flowFile); + private void addMatchingRequestHeaders(final HttpServletRequest request, final Map<String, String> attributes) { + // put arbitrary headers on flow file + for (Enumeration<String> headerEnum = request.getHeaderNames(); + headerEnum.hasMoreElements();) { + String headerName = headerEnum.nextElement(); + if (headerPattern != null && headerPattern.matcher(headerName).matches()) { + String headerValue = request.getHeader(headerName); + attributes.put(headerName, headerValue); + } + } + } - if (holdUuid == null) { - holdUuid = flowFile.getAttribute(CoreAttributes.UUID.key()); - } - } while (hasMoreData.get()); + protected void proceedFlow(final HttpServletRequest request, final HttpServletResponse response, + final ProcessSession session, String foundSubject, final boolean createHold, + final Set<FlowFile> flowFileSet) throws IOException, UnsupportedEncodingException { + if (createHold) { + String uuid = UUID.randomUUID().toString(); + + if (flowFileMap.containsKey(uuid)) { + uuid = UUID.randomUUID().toString(); + } + + final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis(), request.getRemoteHost()); + FlowFileEntryTimeWrapper previousWrapper; + do { + previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper); + if (previousWrapper != null) { + uuid = UUID.randomUUID().toString(); + } + } while (previousWrapper != null); + + response.setStatus(HttpServletResponse.SC_SEE_OTHER); + final String ackUri = "/" + basePath + "/holds/" + uuid; + response.addHeader(LOCATION_HEADER_NAME, ackUri); + response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE); + response.getOutputStream().write(ackUri.getBytes("UTF-8")); + if (logger.isDebugEnabled()) { + logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}]; placed hold on these {} files with ID {}", + new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet.size(), uuid}); + } + } else { + response.setStatus(this.returnCode); + logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success'", + new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject}); + + session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS); + session.commit(); + } + } - if (createHold) { - String uuid = (holdUuid == null) ? UUID.randomUUID().toString() : holdUuid; + private void putAttribute(final Map<String, String> map, final String key, final Object value) { + if (value == null) { + return; + } - if (flowFileMap.containsKey(uuid)) { - uuid = UUID.randomUUID().toString(); - } + putAttribute(map, key, value.toString()); + } - final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis(), request.getRemoteHost()); - FlowFileEntryTimeWrapper previousWrapper; - do { - previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper); - if (previousWrapper != null) { - uuid = UUID.randomUUID().toString(); - } - } while (previousWrapper != null); - - response.setStatus(HttpServletResponse.SC_SEE_OTHER); - final String ackUri = "/" + basePath + "/holds/" + uuid; - response.addHeader(LOCATION_HEADER_NAME, ackUri); - response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE); - response.getOutputStream().write(ackUri.getBytes("UTF-8")); - if (logger.isDebugEnabled()) { - logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}]; placed hold on these {} files with ID {}", - new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet.size(), uuid}); - } - } else { - response.setStatus(this.returnCode); - logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success' {}", - new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFile}); + private void putAttribute(final Map<String, String> map, final String key, final String value) { + if (value == null) { + return; + } - session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS); - session.commit(); - } - } catch (final Throwable t) { - session.rollback(); - if (flowFile == null) { - logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] due to {}", - new Object[]{request.getRemoteHost(), foundSubject, t}); - } else { - logger.error("Unable to receive file {} from Remote Host: [{}] SubjectDN [{}] due to {}", - new Object[]{flowFile, request.getRemoteHost(), foundSubject, t}); - } - response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString()); - } - } + map.put(key, value); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/c81a1351/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java index 117a068..f8e9015 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java @@ -27,15 +27,31 @@ import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.google.common.base.Charsets; +import com.google.common.base.Optional; +import com.google.common.collect.Iterables; +import com.google.common.io.Files; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; + import java.io.DataOutputStream; +import java.io.File; import java.io.IOException; import java.net.HttpURLConnection; import java.net.URL; import java.util.ArrayList; import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; @@ -66,7 +82,7 @@ public class TestListenHTTP { public void setup() throws IOException { proc = new ListenHTTP(); runner = TestRunners.newTestRunner(proc); - availablePort = NetworkUtils.availablePort();; + availablePort = NetworkUtils.availablePort(); runner.setVariable(PORT_VARIABLE, Integer.toString(availablePort)); runner.setVariable(BASEPATH_VARIABLE, HTTP_BASE_PATH); @@ -181,8 +197,8 @@ public class TestListenHTTP { private int executePOST(String message) throws Exception { final SSLContextService sslContextService = runner.getControllerService(SSL_CONTEXT_SERVICE_IDENTIFIER, SSLContextService.class); final boolean secure = (sslContextService != null); - final String scheme = secure ? "https" : "http"; - final URL url = new URL(scheme + "://localhost:" + availablePort + "/" + HTTP_BASE_PATH); + String endpointUrl = buildUrl(secure); + final URL url = new URL(endpointUrl); HttpURLConnection connection; if (secure) { @@ -207,6 +223,10 @@ public class TestListenHTTP { return connection.getResponseCode(); } + private String buildUrl(final boolean secure) { + return String.format("%s://localhost:%s/%s", secure ? "https" : "http" , availablePort, HTTP_BASE_PATH); + } + private void testPOSTRequestsReceived(int returnCode) throws Exception { final List<String> messages = new ArrayList<>(); messages.add("payload 1"); @@ -225,13 +245,29 @@ public class TestListenHTTP { mockFlowFiles.get(3).assertContentEquals("payload 2"); } + private void startWebServerAndSendRequests(Runnable sendRequestToWebserver, int numberOfExpectedFlowFiles, int returnCode) throws Exception { + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + proc.createHttpServer(context); + + new Thread(sendRequestToWebserver).start(); + + long responseTimeout = 10000; + + int numTransferred = 0; + long startTime = System.currentTimeMillis(); + while (numTransferred < numberOfExpectedFlowFiles && (System.currentTimeMillis() - startTime < responseTimeout)) { + proc.onTrigger(context, processSessionFactory); + numTransferred = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS).size(); + Thread.sleep(100); + } + + runner.assertTransferCount(ListenHTTP.RELATIONSHIP_SUCCESS, numberOfExpectedFlowFiles); + } + private void startWebServerAndSendMessages(final List<String> messages, int returnCode) throws Exception { - final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - final ProcessContext context = runner.getProcessContext(); - proc.createHttpServer(context); - Runnable sendMessagestoWebServer = () -> { try { for (final String message : messages) { @@ -244,20 +280,8 @@ public class TestListenHTTP { fail("Not expecting error here."); } }; - new Thread(sendMessagestoWebServer).start(); - - long responseTimeout = 10000; - - int numTransferred = 0; - long startTime = System.currentTimeMillis(); - while (numTransferred < messages.size() && (System.currentTimeMillis() - startTime < responseTimeout)) { - proc.onTrigger(context, processSessionFactory); - numTransferred = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS).size(); - Thread.sleep(100); - } - - runner.assertTransferCount(ListenHTTP.RELATIONSHIP_SUCCESS, messages.size()); + startWebServerAndSendRequests(sendMessagestoWebServer, messages.size(), returnCode); } private SSLContextService configureProcessorSslContextService() throws InitializationException { @@ -287,4 +311,114 @@ public class TestListenHTTP { runner.setProperty(ListenHTTP.SSL_CONTEXT_SERVICE, SSL_CONTEXT_SERVICE_IDENTIFIER); return sslContextService; } + + + @Test(/*timeout=10000*/) + public void testMultipartFormDataRequest() throws Exception { + + runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort)); + runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH); + runner.setProperty(ListenHTTP.RETURN_CODE, Integer.toString(HttpServletResponse.SC_OK)); + + final SSLContextService sslContextService = runner.getControllerService(SSL_CONTEXT_SERVICE_IDENTIFIER, SSLContextService.class); + final boolean isSecure = (sslContextService != null); + + Runnable sendRequestToWebserver = () -> { + try { + MultipartBody multipartBody = new MultipartBody.Builder().setType(MultipartBody.FORM) + .addFormDataPart("p1", "v1") + .addFormDataPart("p2", "v2") + .addFormDataPart("file1", "my-file-text.txt", RequestBody.create(MediaType.parse("text/plain"), createTextFile("my-file-text.txt", "Hello", "World"))) + .addFormDataPart("file2", "my-file-data.json", RequestBody.create(MediaType.parse("application/json"), createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }"))) + .addFormDataPart("file3", "my-file-binary.bin", RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100))) + .build(); + + Request request = + new Request.Builder() + .url(buildUrl(isSecure)) + .post(multipartBody) + .build(); + + int timeout = 3000; + OkHttpClient client = new OkHttpClient.Builder() + .readTimeout(timeout, TimeUnit.MILLISECONDS) + .writeTimeout(timeout, TimeUnit.MILLISECONDS) + .build(); + + try (Response response = client.newCall(request).execute()) { + Assert.assertTrue(String.format("Unexpected code: %s, body: %s", response.code(), response.body().string()), response.isSuccessful()); + } + } catch (final Throwable t) { + t.printStackTrace(); + Assert.fail(t.toString()); + } + }; + + + startWebServerAndSendRequests(sendRequestToWebserver, 5, 200); + + runner.assertAllFlowFilesTransferred(ListenHTTP.RELATIONSHIP_SUCCESS, 5); + List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(ListenHTTP.RELATIONSHIP_SUCCESS); + // Part fragments are not processed in the order we submitted them. + // We cannot rely on the order we sent them in. + MockFlowFile mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "p1"); + mff.assertAttributeEquals("http.multipart.name", "p1"); + mff.assertAttributeExists("http.multipart.size"); + mff.assertAttributeEquals("http.multipart.fragments.sequence.number", "1"); + mff.assertAttributeEquals("http.multipart.fragments.total.number", "5"); + mff.assertAttributeExists("http.headers.multipart.content-disposition"); + + mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "p2"); + mff.assertAttributeEquals("http.multipart.name", "p2"); + mff.assertAttributeExists("http.multipart.size"); + mff.assertAttributeExists("http.multipart.fragments.sequence.number"); + mff.assertAttributeEquals("http.multipart.fragments.total.number", "5"); + mff.assertAttributeExists("http.headers.multipart.content-disposition"); + + mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file1"); + mff.assertAttributeEquals("http.multipart.name", "file1"); + mff.assertAttributeEquals("http.multipart.filename", "my-file-text.txt"); + mff.assertAttributeEquals("http.headers.multipart.content-type", "text/plain"); + mff.assertAttributeExists("http.multipart.size"); + mff.assertAttributeExists("http.multipart.fragments.sequence.number"); + mff.assertAttributeEquals("http.multipart.fragments.total.number", "5"); + mff.assertAttributeExists("http.headers.multipart.content-disposition"); + + mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file2"); + mff.assertAttributeEquals("http.multipart.name", "file2"); + mff.assertAttributeEquals("http.multipart.filename", "my-file-data.json"); + mff.assertAttributeEquals("http.headers.multipart.content-type", "application/json"); + mff.assertAttributeExists("http.multipart.size"); + mff.assertAttributeExists("http.multipart.fragments.sequence.number"); + mff.assertAttributeEquals("http.multipart.fragments.total.number", "5"); + mff.assertAttributeExists("http.headers.multipart.content-disposition"); + + mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file3"); + mff.assertAttributeEquals("http.multipart.name", "file3"); + mff.assertAttributeEquals("http.multipart.filename", "my-file-binary.bin"); + mff.assertAttributeEquals("http.headers.multipart.content-type", "application/octet-stream"); + mff.assertAttributeExists("http.multipart.size"); + mff.assertAttributeExists("http.multipart.fragments.sequence.number"); + mff.assertAttributeEquals("http.multipart.fragments.total.number", "5"); + mff.assertAttributeExists("http.headers.multipart.content-disposition"); + } + + private byte[] generateRandomBinaryData(int i) { + byte[] bytes = new byte[100]; + new Random().nextBytes(bytes); + return bytes; + } + private File createTextFile(String fileName, String... lines) throws IOException { + File file = new File(fileName); + file.deleteOnExit(); + for (String string : lines) { + Files.append(string, file, Charsets.UTF_8); + } + return file; + } + protected MockFlowFile findFlowFile(List<MockFlowFile> flowFilesForRelationship, String attributeName, String attributeValue) { + Optional<MockFlowFile> optional = Iterables.tryFind(flowFilesForRelationship, ff -> ff.getAttribute(attributeName).equals(attributeValue)); + Assert.assertTrue(optional.isPresent()); + return optional.get(); + } }
