NIFI-259 Corrected GetHttp state managment and added a new unit test
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/88d4d2ce Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/88d4d2ce Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/88d4d2ce Branch: refs/heads/master Commit: 88d4d2ce5fe421b6e8955a7b875ad3e01195bd13 Parents: 55b77fe Author: jpercivall <[email protected]> Authored: Wed Feb 3 20:04:39 2016 -0500 Committer: jpercivall <[email protected]> Committed: Wed Feb 3 20:04:39 2016 -0500 ---------------------------------------------------------------------- .../nifi/processors/standard/GetHTTP.java | 98 ++++++++++++++++---- .../nifi/processors/standard/TestGetHTTP.java | 84 +++++++++++++++-- 2 files changed, 156 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/88d4d2ce/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java index 48f22c5..818cd3b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java @@ -75,6 +75,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.flowfile.FlowFile; @@ -91,11 +92,14 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService.ClientAuth; import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.Tuple; @Tags({"get", "fetch", "poll", "http", "https", "ingest", "source", "input"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) @CapabilityDescription("Fetches a file via HTTP. If the HTTP server supports it, the Processor then stores the Last Modified time and the ETag " - + "so that data will not be pulled again until the remote data changes or until the state is cleared.") + + "so that data will not be pulled again until the remote data changes or until the state is cleared. Note that due to limitations on state " + + "management, stored \"last modified\" and etag fields never expire. If the URL in GetHttp uses Expression Language that is unbounded, there " + + "is the potential for Out of Memory Errors to occur.") @WritesAttributes({ @WritesAttribute(attribute = "filename", description = "The filename is set to the name of the file on the remote server"), @WritesAttribute(attribute = "mime.type", description = "The MIME Type of the FlowFile, as reported by the HTTP Content-Type header") @@ -400,16 +404,18 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { final HttpGet get = new HttpGet(url); get.setConfig(requestConfigBuilder.build()); + final StateMap beforeStateMap; + try { - final StateMap stateMap = context.getStateManager().getState(Scope.LOCAL); - final String lastModified = stateMap.get(LAST_MODIFIED); + beforeStateMap = context.getStateManager().getState(Scope.LOCAL); + final String lastModified = beforeStateMap.get(LAST_MODIFIED+":" + url); if (lastModified != null) { - get.addHeader(HEADER_IF_MODIFIED_SINCE, lastModified); + get.addHeader(HEADER_IF_MODIFIED_SINCE, parseStateValue(lastModified).getValue()); } - final String etag = stateMap.get(ETAG); + final String etag = beforeStateMap.get(ETAG+":" + url); if (etag != null) { - get.addHeader(HEADER_IF_NONE_MATCH, etag); + get.addHeader(HEADER_IF_NONE_MATCH, parseStateValue(etag).getValue()); } } catch (final IOException ioe) { throw new ProcessException(ioe); @@ -461,20 +467,8 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { logger.info("Successfully received {} from {} at a rate of {}; transferred to success", new Object[]{flowFile, url, dataRate}); session.commit(); - final Map<String, String> updatedState = new HashMap<>(2); - final Header lastModified = response.getFirstHeader(HEADER_LAST_MODIFIED); - if (lastModified != null) { - updatedState.put(LAST_MODIFIED, lastModified.getValue()); - } + updateStateMap(context,response,beforeStateMap,url); - final Header etag = response.getFirstHeader(HEADER_ETAG); - if (etag != null) { - updatedState.put(ETAG, etag.getValue()); - } - - if (!updatedState.isEmpty()) { - context.getStateManager().setState(updatedState, Scope.LOCAL); - } } catch (final IOException e) { context.yield(); session.rollback(); @@ -490,4 +484,70 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { conMan.shutdown(); } } + + private void updateStateMap(ProcessContext context, HttpResponse response, StateMap beforeStateMap, String url){ + try { + Map<String,String> workingMap = new HashMap<>(); + workingMap.putAll(beforeStateMap.toMap()); + final StateManager stateManager = context.getStateManager(); + StateMap oldValue = beforeStateMap; + + long currentTime = System.currentTimeMillis(); + + final Header receivedLastModified = response.getFirstHeader(HEADER_LAST_MODIFIED); + if (receivedLastModified != null) { + workingMap.put(LAST_MODIFIED + ":" + url, currentTime+":"+receivedLastModified.getValue()); + } + + final Header receivedEtag = response.getFirstHeader(HEADER_ETAG); + if (receivedEtag != null) { + workingMap.put(ETAG + ":" + url, currentTime+":"+receivedEtag.getValue()); + } + + boolean replaceSucceeded = stateManager.replace(oldValue, workingMap, Scope.LOCAL); + boolean changed; + + while(!replaceSucceeded){ + oldValue = stateManager.getState(Scope.LOCAL); + workingMap.clear(); + workingMap.putAll(oldValue.toMap()); + + changed = false; + + if(receivedLastModified != null){ + Tuple<String,String> storedLastModifiedTuple = parseStateValue(workingMap.get(LAST_MODIFIED+":"+url)); + + if(Long.parseLong(storedLastModifiedTuple.getKey()) < currentTime){ + workingMap.put(LAST_MODIFIED + ":" + url, currentTime+":"+receivedLastModified.getValue()); + changed = true; + } + } + + if(receivedEtag != null){ + Tuple<String,String> storedLastModifiedTuple = parseStateValue(workingMap.get(ETAG+":"+url)); + + if(Long.parseLong(storedLastModifiedTuple.getKey()) < currentTime){ + workingMap.put(ETAG + ":" + url, currentTime+":"+receivedEtag.getValue()); + changed = true; + } + } + + if(changed) { + replaceSucceeded = stateManager.replace(oldValue, workingMap, Scope.LOCAL); + } else { + break; + } + } + } catch (final IOException ioe) { + throw new ProcessException(ioe); + } + } + + protected static Tuple<String, String> parseStateValue(String mapValue){ + int indexOfColon = mapValue.indexOf(":"); + + String timestamp = mapValue.substring(0,indexOfColon); + String value = mapValue.substring(indexOfColon+1); + return new Tuple<>(timestamp,value); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/88d4d2ce/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java index 428c811..f8e4122 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java @@ -32,6 +32,7 @@ import org.junit.Test; import java.util.HashMap; import java.util.Map; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; /** @@ -77,21 +78,21 @@ public class TestGetHTTP { controller.run(2); // verify the lastModified and entityTag are updated - controller.getStateManager().assertStateNotEquals(GetHTTP.ETAG, "", Scope.LOCAL); - controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL); + controller.getStateManager().assertStateNotEquals(GetHTTP.ETAG+":"+destination, "", Scope.LOCAL); + controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED+":"+destination, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL); // ran twice, but got one...which is good controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1); // verify remote.source flowfile attribute controller.getFlowFilesForRelationship(GetHTTP.REL_SUCCESS).get(0).assertAttributeEquals("gethttp.remote.source", "localhost"); - controller.clearTransferState(); // turn off checking for etag and lastModified RESTServiceContentModified.IGNORE_ETAG = true; RESTServiceContentModified.IGNORE_LAST_MODIFIED = true; controller.run(2); + // ran twice, got two...which is good controller.assertTransferCount(GetHTTP.REL_SUCCESS, 2); controller.clearTransferState(); @@ -114,30 +115,99 @@ public class TestGetHTTP { RESTServiceContentModified.IGNORE_ETAG = false; RESTServiceContentModified.ETAG = 1; controller.run(2); + // ran twice, got 1...but should have new cached etag controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1); - controller.getStateManager().assertStateEquals(GetHTTP.ETAG, "1", Scope.LOCAL); + String eTagStateValue = controller.getStateManager().getState(Scope.LOCAL).get(GetHTTP.ETAG+":"+destination); + assertEquals("1",GetHTTP.parseStateValue(eTagStateValue).getValue()); controller.clearTransferState(); // turn off checking for Etag, turn on checking for lastModified, but change value RESTServiceContentModified.IGNORE_LAST_MODIFIED = false; RESTServiceContentModified.IGNORE_ETAG = true; RESTServiceContentModified.modificationDate = System.currentTimeMillis() / 1000 * 1000 + 5000; - String lastMod = controller.getStateManager().getState(Scope.LOCAL).get(GetHTTP.LAST_MODIFIED); + String lastMod = controller.getStateManager().getState(Scope.LOCAL).get(GetHTTP.LAST_MODIFIED+":"+destination); controller.run(2); + // ran twice, got 1...but should have new cached etag controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1); - controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED, lastMod, Scope.LOCAL); + controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED+":"+destination, lastMod, Scope.LOCAL); controller.clearTransferState(); - // shutdown web service } finally { + // shutdown web service server.shutdownServer(); } } @Test + public final void testContentModifiedTwoServers() throws Exception { + // set up web services + ServletHandler handler1 = new ServletHandler(); + handler1.addServletWithMapping(RESTServiceContentModified.class, "/*"); + + ServletHandler handler2 = new ServletHandler(); + handler2.addServletWithMapping(RESTServiceContentModified.class, "/*"); + + // create the services + TestServer server1 = new TestServer(); + server1.addHandler(handler1); + + TestServer server2 = new TestServer(); + server2.addHandler(handler2); + + try { + server1.startServer(); + server2.startServer(); + + // this is the base urls with the random ports + String destination1 = server1.getUrl(); + String destination2 = server2.getUrl(); + + // set up NiFi mock controller + controller = TestRunners.newTestRunner(GetHTTP.class); + controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "5 secs"); + controller.setProperty(GetHTTP.URL, destination1); + controller.setProperty(GetHTTP.FILENAME, "testFile"); + controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json"); + + controller.getStateManager().assertStateNotSet(GetHTTP.ETAG+":"+destination1, Scope.LOCAL); + controller.getStateManager().assertStateNotSet(GetHTTP.LAST_MODIFIED+":"+destination1, Scope.LOCAL); + controller.run(2); + + // verify the lastModified and entityTag are updated + controller.getStateManager().assertStateNotEquals(GetHTTP.ETAG+":"+destination1, "", Scope.LOCAL); + controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED+":"+destination1, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL); + + // ran twice, but got one...which is good + controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1); + + controller.clearTransferState(); + + controller.setProperty(GetHTTP.URL, destination2); + controller.getStateManager().assertStateNotSet(GetHTTP.ETAG+":"+destination2, Scope.LOCAL); + controller.getStateManager().assertStateNotSet(GetHTTP.LAST_MODIFIED+":"+destination2, Scope.LOCAL); + + controller.run(2); + + // ran twice, but got one...which is good + controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1); + + // verify the lastModified's and entityTags are updated + controller.getStateManager().assertStateNotEquals(GetHTTP.ETAG+":"+destination1, "", Scope.LOCAL); + controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED+":"+destination1, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL); + controller.getStateManager().assertStateNotEquals(GetHTTP.ETAG+":"+destination2, "", Scope.LOCAL); + controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED+":"+destination2, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL); + + } finally { + // shutdown web services + server1.shutdownServer(); + server2.shutdownServer(); + } + } + + @Test public final void testUserAgent() throws Exception { // set up web service ServletHandler handler = new ServletHandler();
