This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 2bfffd3 NIFI-6317: HandleHttpRequest timeout handling issues
2bfffd3 is described below
commit 2bfffd35b2d6785f6b15f4314b462a1430c6bc39
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Thu May 23 22:45:55 2019 +0200
NIFI-6317: HandleHttpRequest timeout handling issues
- set cleanup thread scheduling to 5 seconds in order to prevent stale
request staying in the cache for a long time
- disable timeout handling in HandleHttpRequest in order to prevent a new
flowfile being generated at request expiration
- use HttpServletResponse.SC* status codes everywhere in HandleHttpRequest,
get rid of importing javax.ws.rs
- add some more error logging to make bug investigations easier
- add a short message to 503 error responses to make bug investigations
easier
This closes #3490.
Signed-off-by: Mark Payne <[email protected]>
---
.../processors/standard/HandleHttpRequest.java | 28 ++++++++++++----------
.../apache/nifi/http/StandardHttpContextMap.java | 11 +++++++--
2 files changed, 24 insertions(+), 15 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
index 897a431..9815816 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
@@ -65,7 +65,6 @@ import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.Part;
-import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
@@ -430,7 +429,7 @@ public class HandleHttpRequest extends AbstractProcessor {
if
(!allowedMethods.contains(request.getMethod().toUpperCase())) {
getLogger().info("Sending back METHOD_NOT_ALLOWED response
to {}; method was {}; request URI was {}",
new Object[]{request.getRemoteAddr(),
request.getMethod(), requestUri});
-
response.sendError(Status.METHOD_NOT_ALLOWED.getStatusCode());
+
response.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
return;
}
@@ -443,34 +442,39 @@ public class HandleHttpRequest extends AbstractProcessor {
}
if (!pathPattern.matcher(uri.getPath()).matches()) {
- response.sendError(Status.NOT_FOUND.getStatusCode());
getLogger().info("Sending back NOT_FOUND response to
{}; request was {} {}",
new Object[]{request.getRemoteAddr(),
request.getMethod(), requestUri});
+ response.sendError(HttpServletResponse.SC_NOT_FOUND);
return;
}
}
// If destination queues full, send back a 503: Service
Unavailable.
if (context.getAvailableRelationships().isEmpty()) {
-
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
+ getLogger().warn("Request from {} cannot be processed,
processor downstream queue is full; responding with SERVICE_UNAVAILABLE",
+ new Object[]{request.getRemoteAddr()});
+
+
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor queue
is full");
return;
}
// Right now, that information, though, is only in the
ProcessSession, not the ProcessContext,
// so it is not known to us. Should see if it can be added to
the ProcessContext.
final AsyncContext async = baseRequest.startAsync();
- async.setTimeout(requestTimeout);
+
+ // disable timeout handling on AsyncContext, timeout will be
handled in HttpContextMap
+ async.setTimeout(0);
+
final boolean added = containerQueue.offer(new
HttpRequestContainer(request, response, async));
if (added) {
getLogger().debug("Added Http Request to queue for {} {}
from {}",
new Object[]{request.getMethod(), requestUri,
request.getRemoteAddr()});
} else {
- getLogger().info("Sending back a SERVICE_UNAVAILABLE
response to {}; request was {} {}",
- new Object[]{request.getRemoteAddr(),
request.getMethod(), request.getRemoteAddr()});
+ getLogger().warn("Request from {} cannot be processed,
container queue is full; responding with SERVICE_UNAVAILABLE",
+ new Object[]{request.getRemoteAddr()});
-
response.sendError(Status.SERVICE_UNAVAILABLE.getStatusCode());
- response.flushBuffer();
+
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Container queue
is full");
async.complete();
}
}
@@ -765,8 +769,7 @@ public class HandleHttpRequest extends AbstractProcessor {
new Object[]{request.getRemoteAddr()});
try {
-
container.getResponse().setStatus(Status.SERVICE_UNAVAILABLE.getStatusCode());
- container.getResponse().flushBuffer();
+
container.getResponse().sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
"HttpContextMap is full");
container.getContext().complete();
} catch (final Exception e) {
getLogger().warn("Failed to respond with SERVICE_UNAVAILABLE message
to {} due to {}",
@@ -791,8 +794,7 @@ public class HandleHttpRequest extends AbstractProcessor {
try {
HttpServletResponse response = container.getResponse();
- response.sendError(Status.BAD_REQUEST.getStatusCode());
- response.flushBuffer();
+ response.sendError(HttpServletResponse.SC_BAD_REQUEST);
container.getContext().complete();
} catch (final IOException ioe) {
getLogger().warn("Failed to send HTTP response to {} due to {}",
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/java/org/apache/nifi/http/StandardHttpContextMap.java
b/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/java/org/apache/nifi/http/StandardHttpContextMap.java
index c25ddbf..c143ae8 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/java/org/apache/nifi/http/StandardHttpContextMap.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/java/org/apache/nifi/http/StandardHttpContextMap.java
@@ -67,6 +67,8 @@ public class StandardHttpContextMap extends
AbstractControllerService implements
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
+ private static final long CLEANUP_MAX_DELAY_NANOS = 5_000_000_000L;
+
private final ConcurrentMap<String, Wrapper> wrapperMap = new
ConcurrentHashMap<>();
private volatile int maxSize = 5000;
@@ -94,7 +96,7 @@ public class StandardHttpContextMap extends
AbstractControllerService implements
});
maxRequestNanos =
context.getProperty(REQUEST_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS);
- final long scheduleNanos = maxRequestNanos / 2;
+ final long scheduleNanos = Math.min(maxRequestNanos / 2,
CLEANUP_MAX_DELAY_NANOS);
executor.scheduleWithFixedDelay(new CleanupExpiredRequests(),
scheduleNanos, scheduleNanos, TimeUnit.NANOSECONDS);
}
@@ -185,11 +187,16 @@ public class StandardHttpContextMap extends
AbstractControllerService implements
// send SERVICE_UNAVAILABLE
try {
final AsyncContext async = entry.getValue().getAsync();
- ((HttpServletResponse)
async.getResponse()).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
+
+ getLogger().warn("Request from {} timed out;
responding with SERVICE_UNAVAILABLE",
+ new
Object[]{async.getRequest().getRemoteAddr()});
+
+ ((HttpServletResponse)
async.getResponse()).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
"Timeout occurred");
async.complete();
} catch (final Exception e) {
// we are trying to indicate that we are unavailable.
If we have an exception and cannot respond,
// then so be it. Nothing to really do here.
+ getLogger().error("Failed to respond with
SERVICE_UNAVAILABLE message", e);
}
}
}