This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new a4027e8 NIFI-8120 Added RuntimeException handling on
HttpContextMap.complete()
a4027e8 is described below
commit a4027e8e77facdf94817e343d88ecbfeeacbf7fc
Author: exceptionfactory <[email protected]>
AuthorDate: Thu Jan 7 10:15:44 2021 -0500
NIFI-8120 Added RuntimeException handling on HttpContextMap.complete()
NIFI-8120 Renamed exception variable and reordered log statements
This closes #4747.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../processors/standard/HandleHttpResponse.java | 24 +--
.../standard/TestHandleHttpResponse.java | 171 ++++++++++++---------
2 files changed, 113 insertions(+), 82 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
index 41c6ecd..a0d3f4f 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
@@ -65,8 +65,6 @@ import org.apache.nifi.util.StopWatch;
@SeeAlso(value = {HandleHttpRequest.class}, classNames =
{"org.apache.nifi.http.StandardHttpContextMap",
"org.apache.nifi.ssl.StandardSSLContextService"})
public class HandleHttpResponse extends AbstractProcessor {
- public static final Pattern NUMBER_PATTERN = Pattern.compile("[0-9]+");
-
public static final PropertyDescriptor STATUS_CODE = new
PropertyDescriptor.Builder()
.name("HTTP Status Code")
.description("The HTTP Status Code to use when responding to the
HTTP Request. See Section 10 of RFC 2616 for more information.")
@@ -136,25 +134,25 @@ public class HandleHttpResponse extends AbstractProcessor
{
final String contextIdentifier =
flowFile.getAttribute(HTTPUtils.HTTP_CONTEXT_ID);
if (contextIdentifier == null) {
- session.transfer(flowFile, REL_FAILURE);
getLogger().warn("Failed to respond to HTTP request for {} because
FlowFile did not have an '" + HTTPUtils.HTTP_CONTEXT_ID + "' attribute",
new Object[]{flowFile});
+ session.transfer(flowFile, REL_FAILURE);
return;
}
final String statusCodeValue =
context.getProperty(STATUS_CODE).evaluateAttributeExpressions(flowFile).getValue();
if (!isNumber(statusCodeValue)) {
- session.transfer(flowFile, REL_FAILURE);
getLogger().error("Failed to respond to HTTP request for {}
because status code was '{}', which is not a valid number", new
Object[]{flowFile, statusCodeValue});
+ session.transfer(flowFile, REL_FAILURE);
return;
}
final HttpContextMap contextMap =
context.getProperty(HTTP_CONTEXT_MAP).asControllerService(HttpContextMap.class);
final HttpServletResponse response =
contextMap.getResponse(contextIdentifier);
if (response == null) {
- session.transfer(flowFile, REL_FAILURE);
getLogger().error("Failed to respond to HTTP request for {}
because FlowFile had an '{}' attribute of {} but could not find an HTTP
Response Object for this identifier",
new Object[]{flowFile, HTTPUtils.HTTP_CONTEXT_ID,
contextIdentifier});
+ session.transfer(flowFile, REL_FAILURE);
return;
}
@@ -192,27 +190,31 @@ public class HandleHttpResponse extends AbstractProcessor
{
session.exportTo(flowFile, response.getOutputStream());
response.flushBuffer();
} catch (final ProcessException e) {
- session.transfer(flowFile, REL_FAILURE);
getLogger().error("Failed to respond to HTTP request for {} due to
{}", new Object[]{flowFile, e});
- contextMap.complete(contextIdentifier);
+ try {
+ contextMap.complete(contextIdentifier);
+ } catch (final RuntimeException ce) {
+ getLogger().error("Failed to complete HTTP Transaction for {}
due to {}", new Object[]{flowFile, ce});
+ }
+ session.transfer(flowFile, REL_FAILURE);
return;
} catch (final Exception e) {
- session.transfer(flowFile, REL_FAILURE);
getLogger().error("Failed to respond to HTTP request for {} due to
{}", new Object[]{flowFile, e});
+ session.transfer(flowFile, REL_FAILURE);
return;
}
try {
contextMap.complete(contextIdentifier);
- } catch (final IllegalStateException ise) {
- getLogger().error("Failed to complete HTTP Transaction for {} due
to {}", new Object[]{flowFile, ise});
+ } catch (final RuntimeException ce) {
+ getLogger().error("Failed to complete HTTP Transaction for {} due
to {}", new Object[]{flowFile, ce});
session.transfer(flowFile, REL_FAILURE);
return;
}
session.getProvenanceReporter().send(flowFile,
HTTPUtils.getURI(flowFile.getAttributes()),
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully responded to HTTP Request for {} with
status code {}", new Object[]{flowFile, statusCode});
+ session.transfer(flowFile, REL_SUCCESS);
}
private static boolean isNumber(final String value) {
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
index 02b1d1c..a2d6b28 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
@@ -49,46 +49,52 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
public class TestHandleHttpResponse {
+ private static final String CONTEXT_MAP_ID =
MockHttpContextMap.class.getSimpleName();
+
+ private static final String HTTP_REQUEST_ID = "HTTP-Request-Identifier";
+
+ private static final int HTTP_STATUS_CREATED =
HttpServletResponse.SC_CREATED;
+
+ private static final String FLOW_FILE_CONTENT = "TESTING";
+
@Test
public void testEnsureCompleted() throws InitializationException {
final TestRunner runner =
TestRunners.newTestRunner(HandleHttpResponse.class);
- final MockHttpContextMap contextMap = new MockHttpContextMap("my-id",
"");
- runner.addControllerService("http-context-map", contextMap);
+ final MockHttpContextMap contextMap = new
MockHttpContextMap(HTTP_REQUEST_ID, null, null);
+ runner.addControllerService(CONTEXT_MAP_ID, contextMap);
runner.enableControllerService(contextMap);
- runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP,
"http-context-map");
+ runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP,
CONTEXT_MAP_ID);
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
runner.setProperty("my-attr", "${my-attr}");
runner.setProperty("no-valid-attr", "${no-valid-attr}");
final Map<String, String> attributes = new HashMap<>();
- attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
+ attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
attributes.put(HTTPUtils.HTTP_REQUEST_URI, "/test");
attributes.put(HTTPUtils.HTTP_LOCAL_NAME, "server");
attributes.put(HTTPUtils.HTTP_PORT, "8443");
attributes.put(HTTPUtils.HTTP_REMOTE_HOST, "client");
attributes.put(HTTPUtils.HTTP_SSL_CERT, "sslDN");
attributes.put("my-attr", "hello");
- attributes.put("status.code", "201");
+ attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
- runner.enqueue("hello".getBytes(), attributes);
+ runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_SUCCESS,
1);
- assertTrue(runner.getProvenanceEvents().size() == 1);
+ assertEquals(1, runner.getProvenanceEvents().size());
assertEquals(ProvenanceEventType.SEND,
runner.getProvenanceEvents().get(0).getEventType());
assertEquals("https://client@server:8443/test",
runner.getProvenanceEvents().get(0).getTransitUri());
- assertEquals("hello", contextMap.baos.toString());
+ assertEquals(FLOW_FILE_CONTENT, contextMap.outputStream.toString());
assertEquals("hello", contextMap.headersSent.get("my-attr"));
assertNull(contextMap.headersSent.get("no-valid-attr"));
- assertEquals(201, contextMap.statusCode);
+ assertEquals(HTTP_STATUS_CREATED, contextMap.statusCode);
assertEquals(1, contextMap.getCompletionCount());
assertTrue(contextMap.headersWithNoValue.isEmpty());
}
@@ -97,15 +103,15 @@ public class TestHandleHttpResponse {
public void testRegexHeaders() throws InitializationException {
final TestRunner runner =
TestRunners.newTestRunner(HandleHttpResponse.class);
- final MockHttpContextMap contextMap = new MockHttpContextMap("my-id",
"");
- runner.addControllerService("http-context-map", contextMap);
+ final MockHttpContextMap contextMap = new
MockHttpContextMap(HTTP_REQUEST_ID, null, null);
+ runner.addControllerService(CONTEXT_MAP_ID, contextMap);
runner.enableControllerService(contextMap);
- runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP,
"http-context-map");
+ runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP,
CONTEXT_MAP_ID);
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
runner.setProperty(HandleHttpResponse.ATTRIBUTES_AS_HEADERS_REGEX,
"^(my.*)$");
final Map<String, String> attributes = new HashMap<>();
- attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
+ attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
attributes.put(HTTPUtils.HTTP_REQUEST_URI, "/test");
attributes.put(HTTPUtils.HTTP_LOCAL_NAME, "server");
attributes.put(HTTPUtils.HTTP_PORT, "8443");
@@ -113,43 +119,43 @@ public class TestHandleHttpResponse {
attributes.put(HTTPUtils.HTTP_SSL_CERT, "sslDN");
attributes.put("my-attr", "hello");
attributes.put("my-blank-attr", "");
- attributes.put("status.code", "201");
+ attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
- runner.enqueue("hello".getBytes(), attributes);
+ runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_SUCCESS,
1);
- assertTrue(runner.getProvenanceEvents().size() == 1);
+ assertEquals(1, runner.getProvenanceEvents().size());
assertEquals(ProvenanceEventType.SEND,
runner.getProvenanceEvents().get(0).getEventType());
assertEquals("https://client@server:8443/test",
runner.getProvenanceEvents().get(0).getTransitUri());
- assertEquals("hello", contextMap.baos.toString());
+ assertEquals(FLOW_FILE_CONTENT, contextMap.outputStream.toString());
assertEquals("hello", contextMap.headersSent.get("my-attr"));
assertNull(contextMap.headersSent.get("my-blank-attr"));
- assertEquals(201, contextMap.statusCode);
+ assertEquals(HTTP_STATUS_CREATED, contextMap.statusCode);
assertEquals(1, contextMap.getCompletionCount());
assertTrue(contextMap.headersWithNoValue.isEmpty());
}
@Test
- public void testWithExceptionThrown() throws InitializationException {
+ public void testResponseFlowFileAccessException() throws
InitializationException {
final TestRunner runner =
TestRunners.newTestRunner(HandleHttpResponse.class);
- final MockHttpContextMap contextMap = new MockHttpContextMap("my-id",
"FlowFileAccessException");
- runner.addControllerService("http-context-map", contextMap);
+ final MockHttpContextMap contextMap = new
MockHttpContextMap(HTTP_REQUEST_ID, new FlowFileAccessException("Access
Problem"), null);
+ runner.addControllerService(CONTEXT_MAP_ID, contextMap);
runner.enableControllerService(contextMap);
- runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP,
"http-context-map");
+ runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP,
CONTEXT_MAP_ID);
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
runner.setProperty("my-attr", "${my-attr}");
runner.setProperty("no-valid-attr", "${no-valid-attr}");
final Map<String, String> attributes = new HashMap<>();
- attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
+ attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
attributes.put("my-attr", "hello");
- attributes.put("status.code", "201");
+ attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
- runner.enqueue("hello".getBytes(), attributes);
+ runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
runner.run();
@@ -158,23 +164,23 @@ public class TestHandleHttpResponse {
}
@Test
- public void testCannotWriteResponse() throws InitializationException {
+ public void testResponseProcessException() throws InitializationException {
final TestRunner runner =
TestRunners.newTestRunner(HandleHttpResponse.class);
- final MockHttpContextMap contextMap = new MockHttpContextMap("my-id",
"ProcessException");
- runner.addControllerService("http-context-map", contextMap);
+ final MockHttpContextMap contextMap = new
MockHttpContextMap(HTTP_REQUEST_ID, new ProcessException(), null);
+ runner.addControllerService(CONTEXT_MAP_ID, contextMap);
runner.enableControllerService(contextMap);
- runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP,
"http-context-map");
+ runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP,
CONTEXT_MAP_ID);
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
runner.setProperty("my-attr", "${my-attr}");
runner.setProperty("no-valid-attr", "${no-valid-attr}");
final Map<String, String> attributes = new HashMap<>();
- attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
+ attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
attributes.put("my-attr", "hello");
- attributes.put("status.code", "201");
+ attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
- runner.enqueue("hello".getBytes(), attributes);
+ runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
runner.run();
@@ -183,20 +189,45 @@ public class TestHandleHttpResponse {
}
@Test
+ public void testResponseProcessExceptionThenIllegalStateException() throws
InitializationException {
+ final TestRunner runner =
TestRunners.newTestRunner(HandleHttpResponse.class);
+
+ final MockHttpContextMap contextMap = new
MockHttpContextMap(HTTP_REQUEST_ID, new ProcessException(), new
IllegalStateException());
+ runner.addControllerService(CONTEXT_MAP_ID, contextMap);
+ runner.enableControllerService(contextMap);
+ runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP,
CONTEXT_MAP_ID);
+ runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
+ runner.setProperty("my-attr", "${my-attr}");
+ runner.setProperty("no-valid-attr", "${no-valid-attr}");
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
+ attributes.put("my-attr", "hello");
+ attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
+
+ runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_FAILURE,
1);
+ assertEquals(0, contextMap.getCompletionCount());
+ }
+
+ @Test
public void testStatusCodeEmpty() throws InitializationException {
final TestRunner runner =
TestRunners.newTestRunner(HandleHttpResponse.class);
- final MockHttpContextMap contextMap = new MockHttpContextMap("my-id",
"");
- runner.addControllerService("http-context-map", contextMap);
+ final MockHttpContextMap contextMap = new
MockHttpContextMap(HTTP_REQUEST_ID, null, null);
+ runner.addControllerService(CONTEXT_MAP_ID, contextMap);
runner.enableControllerService(contextMap);
- runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP,
"http-context-map");
+ runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP,
CONTEXT_MAP_ID);
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
final Map<String, String> attributes = new HashMap<>();
- attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
+ attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
attributes.put("my-attr", "hello");
- runner.enqueue("hello".getBytes(), attributes);
+ runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
runner.run();
@@ -208,16 +239,18 @@ public class TestHandleHttpResponse {
private final String id;
private final AtomicInteger completedCount = new AtomicInteger(0);
- private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ private final ByteArrayOutputStream outputStream = new
ByteArrayOutputStream();
private final ConcurrentMap<String, String> headersSent = new
ConcurrentHashMap<>();
- private final String shouldThrowExceptionClass;
+ private final Exception responseException;
+ private final RuntimeException completeException;
private volatile int statusCode = -1;
private final List<String> headersWithNoValue = new
CopyOnWriteArrayList<>();
- public MockHttpContextMap(final String expectedIdentifier, final
String shouldThrowExceptionClass) {
+ public MockHttpContextMap(final String expectedIdentifier, final
Exception responseException, final RuntimeException completeException) {
this.id = expectedIdentifier;
- this.shouldThrowExceptionClass = shouldThrowExceptionClass;
+ this.responseException = responseException;
+ this.completeException = completeException;
}
@Override
@@ -233,11 +266,7 @@ public class TestHandleHttpResponse {
try {
final HttpServletResponse response =
Mockito.mock(HttpServletResponse.class);
- if(shouldThrowExceptionClass != null &&
shouldThrowExceptionClass.equals("FlowFileAccessException")) {
- Mockito.when(response.getOutputStream()).thenThrow(new
FlowFileAccessException("exception"));
- } else if(shouldThrowExceptionClass != null &&
shouldThrowExceptionClass.equals("ProcessException")) {
- Mockito.when(response.getOutputStream()).thenThrow(new
ProcessException("exception"));
- } else {
+ if (responseException == null) {
Mockito.when(response.getOutputStream()).thenReturn(new
ServletOutputStream() {
@Override
public boolean isReady() {
@@ -249,43 +278,39 @@ public class TestHandleHttpResponse {
}
@Override
- public void write(int b) throws IOException {
- baos.write(b);
+ public void write(int b) {
+ outputStream.write(b);
}
@Override
public void write(byte[] b) throws IOException {
- baos.write(b);
+ outputStream.write(b);
}
@Override
- public void write(byte[] b, int off, int len) throws
IOException {
- baos.write(b, off, len);
+ public void write(byte[] b, int off, int len) {
+ outputStream.write(b, off, len);
}
});
+ } else {
+
Mockito.when(response.getOutputStream()).thenThrow(responseException);
}
- Mockito.doAnswer(new Answer<Object>() {
- @Override
- public Object answer(final InvocationOnMock invocation)
throws Throwable {
- final String key = invocation.getArgument(0);
- final String value = invocation.getArgument(1);
- if (value == null) {
- headersWithNoValue.add(key);
- } else {
- headersSent.put(key, value);
- }
-
- return null;
+ Mockito.doAnswer(invocation -> {
+ final String key = invocation.getArgument(0);
+ final String value = invocation.getArgument(1);
+ if (value == null) {
+ headersWithNoValue.add(key);
+ } else {
+ headersSent.put(key, value);
}
+
+ return null;
}).when(response).setHeader(Mockito.any(String.class),
Mockito.any(String.class));
- Mockito.doAnswer(new Answer<Object>() {
- @Override
- public Object answer(final InvocationOnMock invocation)
throws Throwable {
- statusCode = invocation.getArgument(0);
- return null;
- }
+ Mockito.doAnswer(invocation -> {
+ statusCode = invocation.getArgument(0);
+ return null;
}).when(response).setStatus(Mockito.anyInt());
return response;
@@ -302,6 +327,10 @@ public class TestHandleHttpResponse {
Assert.fail("attempting to respond to wrong request; should
have been " + id + " but was " + identifier);
}
+ if (completeException != null) {
+ throw completeException;
+ }
+
completedCount.incrementAndGet();
}