Repository: nifi Updated Branches: refs/heads/master ce25ae541 -> d5f071b2f
NIFI-3469 multipart request support added to HandleHttpRequest - introducing a in-memory-file-size-threashold, above which the incoming file is written to local disk - using java.io.tmpdir for such file writes - enhancing documentation - documenting how to avoid premature HTTP response - fix and UT for unsuccessful request registration This closes #2991. Signed-off-by: Mark Payne <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d5f071b2 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d5f071b2 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d5f071b2 Branch: refs/heads/master Commit: d5f071b2fe5240d4face9b7c8582ba45fd668294 Parents: ce25ae5 Author: Endre Zoltan Kovacs <[email protected]> Authored: Tue Sep 4 16:35:37 2018 +0200 Committer: Mark Payne <[email protected]> Committed: Mon Oct 15 13:14:31 2018 -0400 ---------------------------------------------------------------------- .../processors/standard/HandleHttpRequest.java | 401 ++++++++++++------- .../additionalDetails.html | 13 + .../standard/TestHandleHttpRequest.java | 225 +++++++++++ 3 files changed, 497 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/d5f071b2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java index ea27188..d03774e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java @@ -32,6 +32,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.http.HttpContextMap; import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; @@ -52,12 +53,18 @@ import org.eclipse.jetty.server.SslConnectionFactory; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.util.ssl.SslContextFactory; +import com.google.common.base.Optional; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; + import javax.servlet.AsyncContext; import javax.servlet.DispatcherType; +import javax.servlet.MultipartConfigElement; import javax.servlet.ServletException; import javax.servlet.http.Cookie; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import javax.servlet.http.Part; import javax.ws.rs.core.Response.Status; import java.io.IOException; import java.io.OutputStream; @@ -109,10 +116,27 @@ import java.util.regex.Pattern; + "This value will not be populated unless the Processor is configured to use an SSLContext Service"), @WritesAttribute(attribute = "http.headers.XXX", description = "Each of the HTTP Headers that is received in the request will be added as an " + "attribute, prefixed with \"http.headers.\" For example, if the request contains an HTTP Header named \"x-my-header\", then the value " - + "will be added to an attribute named \"http.headers.x-my-header\"")}) + + "will be added to an attribute named \"http.headers.x-my-header\""), + @WritesAttribute(attribute = "http.headers.multipart.XXX", description = "Each of the HTTP Headers that is received in the mulipart request will be added as an " + + "attribute, prefixed with \"http.headers.multipart.\" For example, if the multipart request contains an HTTP Header named \"content-disposition\", then the value " + + "will be added to an attribute named \"http.headers.multipart.content-disposition\""), + @WritesAttribute(attribute = "http.multipart.size", + description = "For requests with Content-Type \"multipart/form-data\", the part's content size is recorded into this attribute"), + @WritesAttribute(attribute = "http.multipart.content.type", + description = "For requests with Content-Type \"multipart/form-data\", the part's content type is recorded into this attribute"), + @WritesAttribute(attribute = "http.multipart.name", + description = "For requests with Content-Type \"multipart/form-data\", the part's name is recorded into this attribute"), + @WritesAttribute(attribute = "http.multipart.filename", + description = "For requests with Content-Type \"multipart/form-data\", when the part contains an uploaded file, the name of the file is recorded into this attribute"), + @WritesAttribute(attribute = "http.multipart.fragments.sequence.number", + description = "For requests with Content-Type \"multipart/form-data\", the part's index is recorded into this attribute. The index starts with 1."), + @WritesAttribute(attribute = "http.multipart.fragments.total.number", + description = "For requests with Content-Type \"multipart/form-data\", the count of all parts is recorded into this attribute.")}) @SeeAlso(value = {HandleHttpResponse.class}) public class HandleHttpRequest extends AbstractProcessor { + private static final String MIME_TYPE__MULTIPART_FORM_DATA = "multipart/form-data"; + private static final Pattern URL_QUERY_PARAM_DELIMITER = Pattern.compile("&"); // Allowable values for client auth @@ -229,7 +253,25 @@ public class HandleHttpRequest extends AbstractProcessor { .name("container-queue-size").displayName("Container Queue Size") .description("The size of the queue for Http Request Containers").required(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("50").build(); - + public static final PropertyDescriptor MULTIPART_REQUEST_MAX_SIZE = new PropertyDescriptor.Builder() + .name("multipart-request-max-size") + .displayName("Multipart Request Max Size") + .description("The max size of the request. Only applies for requests with Content-Type: multipart/form-data, " + + "and is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .build(); + public static final PropertyDescriptor MULTIPART_READ_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("multipart-read-buffer-size") + .description("The threshold size, at which the contents of an incoming file would be written to disk. " + + "Only applies for requests with Content-Type: multipart/form-data. " + + "It is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space.") + .displayName("Multipart Read Buffer Size") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("512 KB") + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("All content that is received is routed to the 'success' relationship") @@ -254,6 +296,8 @@ public class HandleHttpRequest extends AbstractProcessor { descriptors.add(ADDITIONAL_METHODS); descriptors.add(CLIENT_AUTH); descriptors.add(CONTAINER_QUEUE_SIZE); + descriptors.add(MULTIPART_REQUEST_MAX_SIZE); + descriptors.add(MULTIPART_READ_BUFFER_SIZE); propertyDescriptors = Collections.unmodifiableList(descriptors); } @@ -521,161 +565,234 @@ public class HandleHttpRequest extends AbstractProcessor { final long start = System.nanoTime(); final HttpServletRequest request = container.getRequest(); - FlowFile flowFile = session.create(); - try (OutputStream flowFileOut = session.write(flowFile)) { - StreamUtils.copy(request.getInputStream(), flowFileOut); - } catch (final IOException e) { - // There may be many reasons which can produce an IOException on the HTTP stream and in some of them, eg. - // bad requests, the connection to the client is not closed. In order to address also these cases, we try - // and answer with a BAD_REQUEST, which lets the client know that the request has not been correctly - // processed and makes it aware that the connection can be closed. - getLogger().error("Failed to receive content from HTTP Request from {} due to {}", - new Object[]{request.getRemoteAddr(), e}); - session.remove(flowFile); - try { - HttpServletResponse response = container.getResponse(); - response.sendError(Status.BAD_REQUEST.getStatusCode()); - response.flushBuffer(); - container.getContext().complete(); - } catch (final IOException ioe) { - getLogger().warn("Failed to send HTTP response to {} due to {}", - new Object[]{request.getRemoteAddr(), ioe}); + if (!Strings.isNullOrEmpty(request.getContentType()) && request.getContentType().contains(MIME_TYPE__MULTIPART_FORM_DATA)) { + final long requestMaxSize = context.getProperty(MULTIPART_REQUEST_MAX_SIZE).asDataSize(DataUnit.B).longValue(); + final int readBufferSize = context.getProperty(MULTIPART_READ_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + String tempDir = System.getProperty("java.io.tmpdir"); + request.setAttribute(Request.__MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, requestMaxSize, requestMaxSize, readBufferSize)); + try { + List<Part> parts = ImmutableList.copyOf(request.getParts()); + int allPartsCount = parts.size(); + final String contextIdentifier = UUID.randomUUID().toString(); + for (int i = 0; i < allPartsCount; i++) { + Part part = parts.get(i); + FlowFile flowFile = session.create(); + try (OutputStream flowFileOut = session.write(flowFile)) { + StreamUtils.copy(part.getInputStream(), flowFileOut); + } catch (IOException e) { + handleFlowContentStreamingError(session, container, request, Optional.of(flowFile), e); + return; + } + flowFile = savePartAttributes(context, session, part, flowFile, i, allPartsCount); + flowFile = saveRequestAttributes(context, session, request, flowFile, contextIdentifier); + if (i == 0) { + // each one of multipart comes from a single request, thus registering only once per loop. + boolean requestRegistrationSuccess = registerRequest(context, session, container, start, request, flowFile); + if (!requestRegistrationSuccess) + break; + } + forwardFlowFile(context, session, container, start, request, flowFile); } + } catch (IOException | ServletException | IllegalStateException e) { + handleFlowContentStreamingError(session, container, request, Optional.absent(), e); return; + } + } else { + FlowFile flowFile = session.create(); + try (OutputStream flowFileOut = session.write(flowFile)) { + StreamUtils.copy(request.getInputStream(), flowFileOut); + } catch (final IOException e) { + handleFlowContentStreamingError(session, container, request, Optional.of(flowFile), e); + return; + } + final String contextIdentifier = UUID.randomUUID().toString(); + flowFile = saveRequestAttributes(context, session, request, flowFile, contextIdentifier); + boolean requestRegistrationSuccess = registerRequest(context, session, container, start, request, flowFile); + if (requestRegistrationSuccess) + forwardFlowFile(context, session, container, start, request, flowFile); } + } - final String charset = request.getCharacterEncoding() == null ? context.getProperty(URL_CHARACTER_SET).getValue() : request.getCharacterEncoding(); - - final String contextIdentifier = UUID.randomUUID().toString(); - final Map<String, String> attributes = new HashMap<>(); - try { - putAttribute(attributes, HTTPUtils.HTTP_CONTEXT_ID, contextIdentifier); - putAttribute(attributes, "mime.type", request.getContentType()); - putAttribute(attributes, "http.servlet.path", request.getServletPath()); - putAttribute(attributes, "http.context.path", request.getContextPath()); - putAttribute(attributes, "http.method", request.getMethod()); - putAttribute(attributes, "http.local.addr", request.getLocalAddr()); - putAttribute(attributes, HTTPUtils.HTTP_LOCAL_NAME, request.getLocalName()); - final String queryString = request.getQueryString(); - if (queryString != null) { - putAttribute(attributes, "http.query.string", URLDecoder.decode(queryString, charset)); - } - putAttribute(attributes, HTTPUtils.HTTP_REMOTE_HOST, request.getRemoteHost()); - putAttribute(attributes, "http.remote.addr", request.getRemoteAddr()); - putAttribute(attributes, "http.remote.user", request.getRemoteUser()); - putAttribute(attributes, "http.protocol", request.getProtocol()); - putAttribute(attributes, HTTPUtils.HTTP_REQUEST_URI, request.getRequestURI()); - putAttribute(attributes, "http.request.url", request.getRequestURL().toString()); - putAttribute(attributes, "http.auth.type", request.getAuthType()); - - putAttribute(attributes, "http.requested.session.id", request.getRequestedSessionId()); - final DispatcherType dispatcherType = request.getDispatcherType(); - if (dispatcherType != null) { - putAttribute(attributes, "http.dispatcher.type", dispatcherType.name()); - } - putAttribute(attributes, "http.character.encoding", request.getCharacterEncoding()); - putAttribute(attributes, "http.locale", request.getLocale()); - putAttribute(attributes, "http.server.name", request.getServerName()); - putAttribute(attributes, HTTPUtils.HTTP_PORT, request.getServerPort()); - - final Enumeration<String> paramEnumeration = request.getParameterNames(); - while (paramEnumeration.hasMoreElements()) { - final String paramName = paramEnumeration.nextElement(); - final String value = request.getParameter(paramName); - attributes.put("http.param." + paramName, value); - } - - final Cookie[] cookies = request.getCookies(); - if (cookies != null) { - for (final Cookie cookie : cookies) { - final String name = cookie.getName(); - final String cookiePrefix = "http.cookie." + name + "."; - attributes.put(cookiePrefix + "value", cookie.getValue()); - attributes.put(cookiePrefix + "domain", cookie.getDomain()); - attributes.put(cookiePrefix + "path", cookie.getPath()); - attributes.put(cookiePrefix + "max.age", String.valueOf(cookie.getMaxAge())); - attributes.put(cookiePrefix + "version", String.valueOf(cookie.getVersion())); - attributes.put(cookiePrefix + "secure", String.valueOf(cookie.getSecure())); - } - } - - if (queryString != null) { - final String[] params = URL_QUERY_PARAM_DELIMITER.split(queryString); - for (final String keyValueString : params) { - final int indexOf = keyValueString.indexOf("="); - if (indexOf < 0) { - // no =, then it's just a key with no value - attributes.put("http.query.param." + URLDecoder.decode(keyValueString, charset), ""); - } else { - final String key = keyValueString.substring(0, indexOf); - final String value; - - if (indexOf == keyValueString.length() - 1) { - value = ""; - } else { - value = keyValueString.substring(indexOf + 1); - } - - attributes.put("http.query.param." + URLDecoder.decode(key, charset), URLDecoder.decode(value, charset)); - } - } - } - } catch (final UnsupportedEncodingException uee) { - throw new ProcessException("Invalid character encoding", uee); // won't happen because charset has been validated - } - - final Enumeration<String> headerNames = request.getHeaderNames(); - while (headerNames.hasMoreElements()) { - final String headerName = headerNames.nextElement(); - final String headerValue = request.getHeader(headerName); - putAttribute(attributes, "http.headers." + headerName, headerValue); - } - - final Principal principal = request.getUserPrincipal(); - if (principal != null) { - putAttribute(attributes, "http.principal.name", principal.getName()); - } + private FlowFile savePartAttributes(ProcessContext context, ProcessSession session, Part part, FlowFile flowFile, final int i, final int allPartsCount) { + final Map<String, String> attributes = new HashMap<>(); + for (String headerName : part.getHeaderNames()) { + final String headerValue = part.getHeader(headerName); + putAttribute(attributes, "http.headers.multipart." + headerName, headerValue); + } + putAttribute(attributes, "http.multipart.size", part.getSize()); + putAttribute(attributes, "http.multipart.content.type", part.getContentType()); + putAttribute(attributes, "http.multipart.name", part.getName()); + putAttribute(attributes, "http.multipart.filename", part.getSubmittedFileName()); + putAttribute(attributes, "http.multipart.fragments.sequence.number", i+1); + putAttribute(attributes, "http.multipart.fragments.total.number", allPartsCount); + return session.putAllAttributes(flowFile, attributes); + } - final X509Certificate certs[] = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate"); - final String subjectDn; - if (certs != null && certs.length > 0) { - final X509Certificate cert = certs[0]; - subjectDn = cert.getSubjectDN().getName(); - final String issuerDn = cert.getIssuerDN().getName(); + private FlowFile saveRequestAttributes(final ProcessContext context, final ProcessSession session, HttpServletRequest request, FlowFile flowFile, String contextIdentifier) { + final String charset = request.getCharacterEncoding() == null ? context.getProperty(URL_CHARACTER_SET).getValue() : request.getCharacterEncoding(); + + final Map<String, String> attributes = new HashMap<>(); + try { + putAttribute(attributes, HTTPUtils.HTTP_CONTEXT_ID, contextIdentifier); + putAttribute(attributes, "mime.type", request.getContentType()); + putAttribute(attributes, "http.servlet.path", request.getServletPath()); + putAttribute(attributes, "http.context.path", request.getContextPath()); + putAttribute(attributes, "http.method", request.getMethod()); + putAttribute(attributes, "http.local.addr", request.getLocalAddr()); + putAttribute(attributes, HTTPUtils.HTTP_LOCAL_NAME, request.getLocalName()); + final String queryString = request.getQueryString(); + if (queryString != null) { + putAttribute(attributes, "http.query.string", URLDecoder.decode(queryString, charset)); + } + putAttribute(attributes, HTTPUtils.HTTP_REMOTE_HOST, request.getRemoteHost()); + putAttribute(attributes, "http.remote.addr", request.getRemoteAddr()); + putAttribute(attributes, "http.remote.user", request.getRemoteUser()); + putAttribute(attributes, "http.protocol", request.getProtocol()); + putAttribute(attributes, HTTPUtils.HTTP_REQUEST_URI, request.getRequestURI()); + putAttribute(attributes, "http.request.url", request.getRequestURL().toString()); + putAttribute(attributes, "http.auth.type", request.getAuthType()); + + putAttribute(attributes, "http.requested.session.id", request.getRequestedSessionId()); + final DispatcherType dispatcherType = request.getDispatcherType(); + if (dispatcherType != null) { + putAttribute(attributes, "http.dispatcher.type", dispatcherType.name()); + } + putAttribute(attributes, "http.character.encoding", request.getCharacterEncoding()); + putAttribute(attributes, "http.locale", request.getLocale()); + putAttribute(attributes, "http.server.name", request.getServerName()); + putAttribute(attributes, HTTPUtils.HTTP_PORT, request.getServerPort()); + + final Enumeration<String> paramEnumeration = request.getParameterNames(); + while (paramEnumeration.hasMoreElements()) { + final String paramName = paramEnumeration.nextElement(); + final String value = request.getParameter(paramName); + attributes.put("http.param." + paramName, value); + } + + final Cookie[] cookies = request.getCookies(); + if (cookies != null) { + for (final Cookie cookie : cookies) { + final String name = cookie.getName(); + final String cookiePrefix = "http.cookie." + name + "."; + attributes.put(cookiePrefix + "value", cookie.getValue()); + attributes.put(cookiePrefix + "domain", cookie.getDomain()); + attributes.put(cookiePrefix + "path", cookie.getPath()); + attributes.put(cookiePrefix + "max.age", String.valueOf(cookie.getMaxAge())); + attributes.put(cookiePrefix + "version", String.valueOf(cookie.getVersion())); + attributes.put(cookiePrefix + "secure", String.valueOf(cookie.getSecure())); + } + } + + if (queryString != null) { + final String[] params = URL_QUERY_PARAM_DELIMITER.split(queryString); + for (final String keyValueString : params) { + final int indexOf = keyValueString.indexOf("="); + if (indexOf < 0) { + // no =, then it's just a key with no value + attributes.put("http.query.param." + URLDecoder.decode(keyValueString, charset), ""); + } else { + final String key = keyValueString.substring(0, indexOf); + final String value; + + if (indexOf == keyValueString.length() - 1) { + value = ""; + } else { + value = keyValueString.substring(indexOf + 1); + } + + attributes.put("http.query.param." + URLDecoder.decode(key, charset), URLDecoder.decode(value, charset)); + } + } + } + } catch (final UnsupportedEncodingException uee) { + throw new ProcessException("Invalid character encoding", uee); // won't happen because charset has been validated + } + + final Enumeration<String> headerNames = request.getHeaderNames(); + while (headerNames.hasMoreElements()) { + final String headerName = headerNames.nextElement(); + final String headerValue = request.getHeader(headerName); + putAttribute(attributes, "http.headers." + headerName, headerValue); + } + + final Principal principal = request.getUserPrincipal(); + if (principal != null) { + putAttribute(attributes, "http.principal.name", principal.getName()); + } + + final X509Certificate certs[] = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate"); + final String subjectDn; + if (certs != null && certs.length > 0) { + final X509Certificate cert = certs[0]; + subjectDn = cert.getSubjectDN().getName(); + final String issuerDn = cert.getIssuerDN().getName(); + + putAttribute(attributes, HTTPUtils.HTTP_SSL_CERT, subjectDn); + putAttribute(attributes, "http.issuer.dn", issuerDn); + } else { + subjectDn = null; + } + + return session.putAllAttributes(flowFile, attributes); + } - putAttribute(attributes, HTTPUtils.HTTP_SSL_CERT, subjectDn); - putAttribute(attributes, "http.issuer.dn", issuerDn); - } else { - subjectDn = null; - } + private void forwardFlowFile(final ProcessContext context, final ProcessSession session, + HttpRequestContainer container, final long start, final HttpServletRequest request, FlowFile flowFile) { + final long receiveMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + String subjectDn = flowFile.getAttribute(HTTPUtils.HTTP_SSL_CERT); + session.getProvenanceReporter().receive(flowFile, HTTPUtils.getURI(flowFile.getAttributes()), + "Received from " + request.getRemoteAddr() + (subjectDn == null ? "" : " with DN=" + subjectDn), receiveMillis); + session.transfer(flowFile, REL_SUCCESS); + getLogger().info("Transferring {} to 'success'; received from {}", new Object[]{flowFile, request.getRemoteAddr()}); + } - flowFile = session.putAllAttributes(flowFile, attributes); + private boolean registerRequest(final ProcessContext context, final ProcessSession session, + HttpRequestContainer container, final long start, final HttpServletRequest request, FlowFile flowFile) { final HttpContextMap contextMap = context.getProperty(HTTP_CONTEXT_MAP).asControllerService(HttpContextMap.class); + String contextIdentifier = flowFile.getAttribute(HTTPUtils.HTTP_CONTEXT_ID); final boolean registered = contextMap.register(contextIdentifier, request, container.getResponse(), container.getContext()); + if (registered) + return true; - if (!registered) { - getLogger().warn("Received request from {} but could not process it because too many requests are already outstanding; responding with SERVICE_UNAVAILABLE", - new Object[]{request.getRemoteAddr()}); - - try { - container.getResponse().setStatus(Status.SERVICE_UNAVAILABLE.getStatusCode()); - container.getResponse().flushBuffer(); - container.getContext().complete(); - } catch (final Exception e) { - getLogger().warn("Failed to respond with SERVICE_UNAVAILABLE message to {} due to {}", - new Object[]{request.getRemoteAddr(), e}); - } + getLogger().warn("Received request from {} but could not process it because too many requests are already outstanding; responding with SERVICE_UNAVAILABLE", + new Object[]{request.getRemoteAddr()}); - session.remove(flowFile); - return; + try { + container.getResponse().setStatus(Status.SERVICE_UNAVAILABLE.getStatusCode()); + container.getResponse().flushBuffer(); + container.getContext().complete(); + } catch (final Exception e) { + getLogger().warn("Failed to respond with SERVICE_UNAVAILABLE message to {} due to {}", + new Object[]{request.getRemoteAddr(), e}); } - final long receiveMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - session.getProvenanceReporter().receive(flowFile, HTTPUtils.getURI(attributes), "Received from " + request.getRemoteAddr() + (subjectDn == null ? "" : " with DN=" + subjectDn), receiveMillis); - session.transfer(flowFile, REL_SUCCESS); - getLogger().info("Transferring {} to 'success'; received from {}", new Object[]{flowFile, request.getRemoteAddr()}); + session.remove(flowFile); + return false; + } + + + protected void handleFlowContentStreamingError(final ProcessSession session, HttpRequestContainer container, + final HttpServletRequest request, Optional<FlowFile> flowFile, final Exception e) { + // There may be many reasons which can produce an IOException on the HTTP stream and in some of them, eg. + // bad requests, the connection to the client is not closed. In order to address also these cases, we try + // and answer with a BAD_REQUEST, which lets the client know that the request has not been correctly + // processed and makes it aware that the connection can be closed. + getLogger().error("Failed to receive content from HTTP Request from {} due to {}", + new Object[]{request.getRemoteAddr(), e}); + if (flowFile.isPresent()) + session.remove(flowFile.get()); + + try { + HttpServletResponse response = container.getResponse(); + response.sendError(Status.BAD_REQUEST.getStatusCode()); + response.flushBuffer(); + container.getContext().complete(); + } catch (final IOException ioe) { + getLogger().warn("Failed to send HTTP response to {} due to {}", + new Object[]{request.getRemoteAddr(), ioe}); + } } private void putAttribute(final Map<String, String> map, final String key, final Object value) { http://git-wip-us.apache.org/repos/asf/nifi/blob/d5f071b2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HandleHttpRequest/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HandleHttpRequest/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HandleHttpRequest/additionalDetails.html index 70adb85..6510e12 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HandleHttpRequest/additionalDetails.html +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HandleHttpRequest/additionalDetails.html @@ -39,5 +39,18 @@ The HandleHttpRequest Processor provides several Properties to configure which methods are supported, the paths that are supported, and SSL configuration. </p> + + <p> + To handle requests with Content-Type: <i>multipart/form-data</i> containing multiple parts, additional attention needs to be paid. + Each <i>part</i> generates a FlowFile of its own. To each these FlowFiles, some special attributes are written: + <ul> + <li>http.context.identifier</li> + <li>http.multipart.fragments.sequence.number</li> + <li>http.multipart.fragments.total.number</li> + </ul> + These attributes could be used to implement a gating mechanism for HandleHttpResponse processor to wait for the processing of FlowFiles + with sequence number <b>http.multipart.fragments.sequence.number</b> until up to <b>http.multipart.fragments.total.number</b> of flow files are processed, + belonging to the same <b>http.context.identifier</b>, which is unique to the request. + </p> </body> </html> http://git-wip-us.apache.org/repos/asf/nifi/blob/d5f071b2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java index 8c98a1c..6352291 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java @@ -18,15 +18,19 @@ package org.apache.nifi.processors.standard; import static org.junit.Assert.assertEquals; +import java.io.File; import java.io.IOException; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; @@ -36,6 +40,7 @@ import javax.servlet.http.HttpServletResponse; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.http.HttpContextMap; +import org.apache.nifi.processors.standard.util.HTTPUtils; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.StandardRestrictedSSLContextService; @@ -48,6 +53,18 @@ import org.apache.nifi.util.TestRunners; import org.junit.Assert; import org.junit.Test; +import com.google.api.client.util.Charsets; +import com.google.common.base.Optional; +import com.google.common.collect.Iterables; +import com.google.common.io.Files; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; + public class TestHandleHttpRequest { private static Map<String, String> getTruststoreProperties() { @@ -141,6 +158,214 @@ public class TestHandleHttpRequest { @Test(timeout=10000) + public void testMultipartFormDataRequest() throws InitializationException, MalformedURLException, IOException, InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class); + runner.setProperty(HandleHttpRequest.PORT, "0"); + + final MockHttpContextMap contextMap = new MockHttpContextMap(); + runner.addControllerService("http-context-map", contextMap); + runner.enableControllerService(contextMap); + runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map"); + + // trigger processor to stop but not shutdown. + runner.run(1, false); + try { + final Thread httpThread = new Thread(new Runnable() { + @Override + public void run() { + try { + + final int port = ((HandleHttpRequest) runner.getProcessor()).getPort(); + + MultipartBody multipartBody = new MultipartBody.Builder() + .setType(MultipartBody.FORM) + .addFormDataPart("p1", "v1") + .addFormDataPart("p2", "v2") + .addFormDataPart("file1", "my-file-text.txt", RequestBody.create(MediaType.parse("text/plain"), createTextFile("my-file-text.txt", "Hello", "World"))) + .addFormDataPart("file2", "my-file-data.json", RequestBody.create(MediaType.parse("application/json"), createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }"))) + .addFormDataPart("file3", "my-file-binary.bin", RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100))) + .build(); + + Request request = new Request.Builder() + .url(String.format("http://localhost:%s/my/path", port)) + .post(multipartBody).build(); + + OkHttpClient client = + new OkHttpClient.Builder() + .readTimeout(3000, TimeUnit.MILLISECONDS) + .writeTimeout(3000, TimeUnit.MILLISECONDS) + .build(); + + try (Response response = client.newCall(request).execute()) { + Assert.assertTrue(String.format("Unexpected code: %s, body: %s", response.code(), response.body().string()), response.isSuccessful()); + } + } catch (final Throwable t) { + t.printStackTrace(); + Assert.fail(t.toString()); + } + } + }); + httpThread.start(); + + while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) { + // process the request. + runner.run(1, false, false); + } + + runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 5); + assertEquals(1, contextMap.size()); + + List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS); + + // Part fragments are not processed in the order we submitted them. + // We cannot rely on the order we sent them in. + MockFlowFile mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "p1"); + String contextId = mff.getAttribute(HTTPUtils.HTTP_CONTEXT_ID); + mff.assertAttributeEquals("http.multipart.name", "p1"); + mff.assertAttributeExists("http.multipart.size"); + mff.assertAttributeEquals("http.multipart.fragments.sequence.number", "1"); + mff.assertAttributeEquals("http.multipart.fragments.total.number", "5"); + mff.assertAttributeExists("http.headers.multipart.content-disposition"); + + + mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "p2"); + // each part generates a corresponding flow file - yet all parts are coming from the same request, + mff.assertAttributeEquals(HTTPUtils.HTTP_CONTEXT_ID, contextId); + mff.assertAttributeEquals("http.multipart.name", "p2"); + mff.assertAttributeExists("http.multipart.size"); + mff.assertAttributeExists("http.multipart.fragments.sequence.number"); + mff.assertAttributeEquals("http.multipart.fragments.total.number", "5"); + mff.assertAttributeExists("http.headers.multipart.content-disposition"); + + + mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file1"); + mff.assertAttributeEquals(HTTPUtils.HTTP_CONTEXT_ID, contextId); + mff.assertAttributeEquals("http.multipart.name", "file1"); + mff.assertAttributeEquals("http.multipart.filename", "my-file-text.txt"); + mff.assertAttributeEquals("http.headers.multipart.content-type", "text/plain"); + mff.assertAttributeExists("http.multipart.size"); + mff.assertAttributeExists("http.multipart.fragments.sequence.number"); + mff.assertAttributeEquals("http.multipart.fragments.total.number", "5"); + mff.assertAttributeExists("http.headers.multipart.content-disposition"); + + + mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file2"); + mff.assertAttributeEquals(HTTPUtils.HTTP_CONTEXT_ID, contextId); + mff.assertAttributeEquals("http.multipart.name", "file2"); + mff.assertAttributeEquals("http.multipart.filename", "my-file-data.json"); + mff.assertAttributeEquals("http.headers.multipart.content-type", "application/json"); + mff.assertAttributeExists("http.multipart.size"); + mff.assertAttributeExists("http.multipart.fragments.sequence.number"); + mff.assertAttributeEquals("http.multipart.fragments.total.number", "5"); + mff.assertAttributeExists("http.headers.multipart.content-disposition"); + + + mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file3"); + mff.assertAttributeEquals(HTTPUtils.HTTP_CONTEXT_ID, contextId); + mff.assertAttributeEquals("http.multipart.name", "file3"); + mff.assertAttributeEquals("http.multipart.filename", "my-file-binary.bin"); + mff.assertAttributeEquals("http.headers.multipart.content-type", "application/octet-stream"); + mff.assertAttributeExists("http.multipart.size"); + mff.assertAttributeExists("http.multipart.fragments.sequence.number"); + mff.assertAttributeEquals("http.multipart.fragments.total.number", "5"); + mff.assertAttributeExists("http.headers.multipart.content-disposition"); + } finally { + // shut down the server + runner.run(1, true); + } + } + + @Test(timeout=10000) + public void testMultipartFormDataRequestFailToRegisterContext() throws InitializationException, MalformedURLException, IOException, InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class); + runner.setProperty(HandleHttpRequest.PORT, "0"); + + final MockHttpContextMap contextMap = new MockHttpContextMap(); + contextMap.setRegisterSuccessfully(false); + runner.addControllerService("http-context-map", contextMap); + runner.enableControllerService(contextMap); + runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map"); + + // trigger processor to stop but not shutdown. + runner.run(1, false); + try { + AtomicInteger responseCode = new AtomicInteger(0); + final Thread httpThread = new Thread(new Runnable() { + @Override + public void run() { + try { + + final int port = ((HandleHttpRequest) runner.getProcessor()).getPort(); + + MultipartBody multipartBody = new MultipartBody.Builder() + .setType(MultipartBody.FORM) + .addFormDataPart("p1", "v1") + .addFormDataPart("p2", "v2") + .addFormDataPart("file1", "my-file-text.txt", RequestBody.create(MediaType.parse("text/plain"), createTextFile("my-file-text.txt", "Hello", "World"))) + .addFormDataPart("file2", "my-file-data.json", RequestBody.create(MediaType.parse("application/json"), createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }"))) + .addFormDataPart("file3", "my-file-binary.bin", RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100))) + .build(); + + Request request = new Request.Builder() + .url(String.format("http://localhost:%s/my/path", port)) + .post(multipartBody).build(); + + OkHttpClient client = + new OkHttpClient.Builder() + .readTimeout(3000, TimeUnit.MILLISECONDS) + .writeTimeout(3000, TimeUnit.MILLISECONDS) + .build(); + + try (Response response = client.newCall(request).execute()) { + responseCode.set(response.code()); + } + } catch (final Throwable t) { + t.printStackTrace(); + Assert.fail(t.toString()); + } + } + }); + httpThread.start(); + + while (responseCode.get() == 0) { + // process the request. + runner.run(1, false, false); + } + + runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 0); + assertEquals(0, contextMap.size()); + Assert.assertEquals(503, responseCode.get()); + } finally { + // shut down the server + runner.run(1, true); + } + } + + private byte[] generateRandomBinaryData(int i) { + byte[] bytes = new byte[100]; + new Random().nextBytes(bytes); + return bytes; + } + + + private File createTextFile(String fileName, String... lines) throws IOException { + File file = new File(fileName); + file.deleteOnExit(); + for (String string : lines) { + Files.append(string, file, Charsets.UTF_8); + } + return file; + } + + + protected MockFlowFile findFlowFile(List<MockFlowFile> flowFilesForRelationship, String attributeName, String attributeValue) { + Optional<MockFlowFile> optional = Iterables.tryFind(flowFilesForRelationship, ff -> ff.getAttribute(attributeName).equals(attributeValue)); + Assert.assertTrue(optional.isPresent()); + return optional.get(); + } + + + @Test(timeout=10000) public void testFailToRegister() throws InitializationException, MalformedURLException, IOException, InterruptedException { final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class); runner.setProperty(HandleHttpRequest.PORT, "0");
