Github user ijokarumawak commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2991#discussion_r222578602
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
---
@@ -521,161 +564,223 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
final long start = System.nanoTime();
final HttpServletRequest request = container.getRequest();
- FlowFile flowFile = session.create();
- try (OutputStream flowFileOut = session.write(flowFile)) {
- StreamUtils.copy(request.getInputStream(), flowFileOut);
- } catch (final IOException e) {
- // There may be many reasons which can produce an IOException
on the HTTP stream and in some of them, eg.
- // bad requests, the connection to the client is not closed.
In order to address also these cases, we try
- // and answer with a BAD_REQUEST, which lets the client know
that the request has not been correctly
- // processed and makes it aware that the connection can be
closed.
- getLogger().error("Failed to receive content from HTTP Request
from {} due to {}",
- new Object[]{request.getRemoteAddr(), e});
- session.remove(flowFile);
- try {
- HttpServletResponse response = container.getResponse();
- response.sendError(Status.BAD_REQUEST.getStatusCode());
- response.flushBuffer();
- container.getContext().complete();
- } catch (final IOException ioe) {
- getLogger().warn("Failed to send HTTP response to {} due
to {}",
- new Object[]{request.getRemoteAddr(), ioe});
+ if (!Strings.isNullOrEmpty(request.getContentType()) &&
request.getContentType().contains(MIME_TYPE__MULTIPART_FORM_DATA)) {
+ final long maxRequestSize =
context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).longValue();
+ final int inMemoryFileSizeThreshold =
context.getProperty(IN_MEMORY_FILE_SIZE_THRESHOLD).asDataSize(DataUnit.B).intValue()
+ String tempDir = System.getProperty("java.io.tmpdir");
+ request.setAttribute(Request.__MULTIPART_CONFIG_ELEMENT, new
MultipartConfigElement(tempDir, maxRequestSize, maxRequestSize,
inMemoryFileSizeThreshold));
+ try {
+ List<Part> parts = ImmutableList.copyOf(request.getParts());
+ int allPartsCount = parts.size();
+ final String contextIdentifier = UUID.randomUUID().toString();
+ for (int i = 0; i < allPartsCount; i++) {
+ Part part = parts.get(i);
+ FlowFile flowFile = session.create();
+ try (OutputStream flowFileOut = session.write(flowFile)) {
+ StreamUtils.copy(part.getInputStream(), flowFileOut);
+ } catch (IOException e) {
+ handleFlowContentStreamingError(session, container,
request, Optional.of(flowFile), e);
+ return;
+ }
+ flowFile = savePartAttributes(context, session, part,
flowFile, i, allPartsCount);
+ flowFile = saveRequestAttributes(context, session, request,
flowFile, contextIdentifier);
+ forwardFlowFile(context, session, container, start, request,
flowFile, i == 0);
}
+ } catch (IOException | ServletException e) {
+ handleFlowContentStreamingError(session, container, request,
Optional.absent(), e);
--- End diff --
If the uploaded file size exceeds the configured maximum size,
IllegalStateException was thrown. And we need to catch that to
handleFlowContentStreamingError and to return 400 BAD_REQUEST status code.
When I uploaded 9GB file, NiFi logged ERROR, but HTTP client kept sending
remaining data.
By returning 400, client stops sending data immediately.
FYI, Jetty threw following exception:
```
2018-10-04 17:18:02,915 ERROR [Timer-Driven Process Thread-10]
o.a.n.p.standard.HandleHttpRequest
HandleHttpRequest[id=3daec2d8-0166-1000-ea00-52f6b3d7d1a1]
HandleHttpRequest[id=3daec2d8-0166-1000-ea00-52f6b3d7d1a1] failed to process
session due to java.lang.IllegalStateException: Request exceeds maxRequestSize
(1048576); Processor Administratively Yielded for 1 sec:
java.lang.IllegalStateException: Request exceeds maxRequestSize (1048576)
java.lang.IllegalStateException: Request exceeds maxRequestSize (1048576)
at
org.eclipse.jetty.util.MultiPartInputStreamParser.parse(MultiPartInputStreamParser.java:773)
at
org.eclipse.jetty.util.MultiPartInputStreamParser.getParts(MultiPartInputStreamParser.java:493)
at
org.eclipse.jetty.server.MultiParts$MultiPartsUtilParser.<init>(MultiParts.java:121)
at org.eclipse.jetty.server.Request.newMultiParts(Request.java:2410)
at org.eclipse.jetty.server.Request.getParts(Request.java:2333)
at org.eclipse.jetty.server.Request.getParts(Request.java:2319)
at
org.apache.nifi.processors.standard.HandleHttpRequest.onTrigger(HandleHttpRequest.java:575)
at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
at
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
```
---