This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bfffd3  NIFI-6317: HandleHttpRequest timeout handling issues
2bfffd3 is described below

commit 2bfffd35b2d6785f6b15f4314b462a1430c6bc39
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Thu May 23 22:45:55 2019 +0200

    NIFI-6317: HandleHttpRequest timeout handling issues
    
    - set cleanup thread scheduling to 5 seconds in order to prevent stale 
request staying in the cache for a long time
    - disable timeout handling in HandleHttpRequest in order to prevent a new 
flowfile being generated at request expiration
    - use HttpServletResponse.SC* status codes everywhere in HandleHttpRequest, 
get rid of importing javax.ws.rs
    - add some more error logging to make bug investigations easier
    - add a short message to 503 error responses to make bug investigations 
easier
    
    This closes #3490.
    
    Signed-off-by: Mark Payne <[email protected]>
---
 .../processors/standard/HandleHttpRequest.java     | 28 ++++++++++++----------
 .../apache/nifi/http/StandardHttpContextMap.java   | 11 +++++++--
 2 files changed, 24 insertions(+), 15 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
index 897a431..9815816 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
@@ -65,7 +65,6 @@ import javax.servlet.http.Cookie;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.servlet.http.Part;
-import javax.ws.rs.core.Response.Status;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
@@ -430,7 +429,7 @@ public class HandleHttpRequest extends AbstractProcessor {
                 if 
(!allowedMethods.contains(request.getMethod().toUpperCase())) {
                     getLogger().info("Sending back METHOD_NOT_ALLOWED response 
to {}; method was {}; request URI was {}",
                             new Object[]{request.getRemoteAddr(), 
request.getMethod(), requestUri});
-                    
response.sendError(Status.METHOD_NOT_ALLOWED.getStatusCode());
+                    
response.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
                     return;
                 }
 
@@ -443,34 +442,39 @@ public class HandleHttpRequest extends AbstractProcessor {
                     }
 
                     if (!pathPattern.matcher(uri.getPath()).matches()) {
-                        response.sendError(Status.NOT_FOUND.getStatusCode());
                         getLogger().info("Sending back NOT_FOUND response to 
{}; request was {} {}",
                                 new Object[]{request.getRemoteAddr(), 
request.getMethod(), requestUri});
+                        response.sendError(HttpServletResponse.SC_NOT_FOUND);
                         return;
                     }
                 }
 
                 // If destination queues full, send back a 503: Service 
Unavailable.
                 if (context.getAvailableRelationships().isEmpty()) {
-                    
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
+                    getLogger().warn("Request from {} cannot be processed, 
processor downstream queue is full; responding with SERVICE_UNAVAILABLE",
+                            new Object[]{request.getRemoteAddr()});
+
+                    
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor queue 
is full");
                     return;
                 }
 
                 // Right now, that information, though, is only in the 
ProcessSession, not the ProcessContext,
                 // so it is not known to us. Should see if it can be added to 
the ProcessContext.
                 final AsyncContext async = baseRequest.startAsync();
-                async.setTimeout(requestTimeout);
+
+                // disable timeout handling on AsyncContext, timeout will be 
handled in HttpContextMap
+                async.setTimeout(0);
+
                 final boolean added = containerQueue.offer(new 
HttpRequestContainer(request, response, async));
 
                 if (added) {
                     getLogger().debug("Added Http Request to queue for {} {} 
from {}",
                             new Object[]{request.getMethod(), requestUri, 
request.getRemoteAddr()});
                 } else {
-                    getLogger().info("Sending back a SERVICE_UNAVAILABLE 
response to {}; request was {} {}",
-                            new Object[]{request.getRemoteAddr(), 
request.getMethod(), request.getRemoteAddr()});
+                    getLogger().warn("Request from {} cannot be processed, 
container queue is full; responding with SERVICE_UNAVAILABLE",
+                            new Object[]{request.getRemoteAddr()});
 
-                    
response.sendError(Status.SERVICE_UNAVAILABLE.getStatusCode());
-                    response.flushBuffer();
+                    
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Container queue 
is full");
                     async.complete();
                 }
             }
@@ -765,8 +769,7 @@ public class HandleHttpRequest extends AbstractProcessor {
             new Object[]{request.getRemoteAddr()});
 
         try {
-          
container.getResponse().setStatus(Status.SERVICE_UNAVAILABLE.getStatusCode());
-          container.getResponse().flushBuffer();
+          
container.getResponse().sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, 
"HttpContextMap is full");
           container.getContext().complete();
         } catch (final Exception e) {
           getLogger().warn("Failed to respond with SERVICE_UNAVAILABLE message 
to {} due to {}",
@@ -791,8 +794,7 @@ public class HandleHttpRequest extends AbstractProcessor {
 
       try {
           HttpServletResponse response = container.getResponse();
-          response.sendError(Status.BAD_REQUEST.getStatusCode());
-          response.flushBuffer();
+          response.sendError(HttpServletResponse.SC_BAD_REQUEST);
           container.getContext().complete();
       } catch (final IOException ioe) {
           getLogger().warn("Failed to send HTTP response to {} due to {}",
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/java/org/apache/nifi/http/StandardHttpContextMap.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/java/org/apache/nifi/http/StandardHttpContextMap.java
index c25ddbf..c143ae8 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/java/org/apache/nifi/http/StandardHttpContextMap.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/java/org/apache/nifi/http/StandardHttpContextMap.java
@@ -67,6 +67,8 @@ public class StandardHttpContextMap extends 
AbstractControllerService implements
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .build();
 
+    private static final long CLEANUP_MAX_DELAY_NANOS = 5_000_000_000L;
+
     private final ConcurrentMap<String, Wrapper> wrapperMap = new 
ConcurrentHashMap<>();
 
     private volatile int maxSize = 5000;
@@ -94,7 +96,7 @@ public class StandardHttpContextMap extends 
AbstractControllerService implements
         });
 
         maxRequestNanos = 
context.getProperty(REQUEST_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS);
-        final long scheduleNanos = maxRequestNanos / 2;
+        final long scheduleNanos = Math.min(maxRequestNanos / 2, 
CLEANUP_MAX_DELAY_NANOS);
         executor.scheduleWithFixedDelay(new CleanupExpiredRequests(), 
scheduleNanos, scheduleNanos, TimeUnit.NANOSECONDS);
     }
 
@@ -185,11 +187,16 @@ public class StandardHttpContextMap extends 
AbstractControllerService implements
                     // send SERVICE_UNAVAILABLE
                     try {
                         final AsyncContext async = entry.getValue().getAsync();
-                        ((HttpServletResponse) 
async.getResponse()).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
+
+                        getLogger().warn("Request from {} timed out; 
responding with SERVICE_UNAVAILABLE",
+                                new 
Object[]{async.getRequest().getRemoteAddr()});
+
+                        ((HttpServletResponse) 
async.getResponse()).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, 
"Timeout occurred");
                         async.complete();
                     } catch (final Exception e) {
                         // we are trying to indicate that we are unavailable. 
If we have an exception and cannot respond,
                         // then so be it. Nothing to really do here.
+                        getLogger().error("Failed to respond with 
SERVICE_UNAVAILABLE message", e);
                     }
                 }
             }

Reply via email to