Repository: nifi Updated Branches: refs/heads/NIFI-259 [created] 57dadb728
http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java index 5eab704..d3eb515 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java @@ -18,7 +18,6 @@ package org.apache.nifi.processors.standard; import java.io.File; import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.URI; @@ -29,22 +28,16 @@ import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.UnrecoverableKeyException; import java.security.cert.CertificateException; -import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Date; +import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Locale; -import java.util.Properties; +import java.util.Map; import java.util.Set; -import java.util.TimeZone; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import javax.net.ssl.SSLContext; @@ -76,11 +69,12 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnRemoved; -import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -202,30 +196,14 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { public static final String LAST_MODIFIED_DATE_PATTERN_RFC1123 = "EEE, dd MMM yyyy HH:mm:ss zzz"; // package access to enable unit testing - static final String UNINITIALIZED_LAST_MODIFIED_VALUE; - - private static final String HTTP_CACHE_FILE_PREFIX = "conf/.httpCache-"; - static final String ETAG = "ETag"; - static final String LAST_MODIFIED = "LastModified"; - static { - final SimpleDateFormat sdf = new SimpleDateFormat(LAST_MODIFIED_DATE_PATTERN_RFC1123, Locale.US); - sdf.setTimeZone(TimeZone.getTimeZone("GMT")); - UNINITIALIZED_LAST_MODIFIED_VALUE = sdf.format(new Date(1L)); - } - final AtomicReference<String> lastModifiedRef = new AtomicReference<>(UNINITIALIZED_LAST_MODIFIED_VALUE); - final AtomicReference<String> entityTagRef = new AtomicReference<>(""); - // end private Set<Relationship> relationships; private List<PropertyDescriptor> properties; - private volatile long timeToPersist = 0; - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final ReadLock readLock = lock.readLock(); - private final WriteLock writeLock = lock.writeLock(); + private final AtomicBoolean clearState = new AtomicBoolean(false); @Override protected void init(final ProcessorInitializationContext context) { @@ -247,16 +225,6 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { properties.add(PROXY_HOST); properties.add(PROXY_PORT); this.properties = Collections.unmodifiableList(properties); - - // load etag and lastModified from file - final File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier()); - try (FileInputStream fis = new FileInputStream(httpCache)) { - final Properties props = new Properties(); - props.load(fis); - entityTagRef.set(props.getProperty(ETAG)); - lastModifiedRef.set(props.getProperty(LAST_MODIFIED)); - } catch (final IOException swallow) { - } } @Override @@ -271,28 +239,13 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { @Override public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { - entityTagRef.set(""); - lastModifiedRef.set(UNINITIALIZED_LAST_MODIFIED_VALUE); - } - - @OnStopped - public void onStopped() { - final File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier()); - try (FileOutputStream fos = new FileOutputStream(httpCache)) { - final Properties props = new Properties(); - props.setProperty(ETAG, entityTagRef.get()); - props.setProperty(LAST_MODIFIED, lastModifiedRef.get()); - props.store(fos, "GetHTTP file modification values"); - } catch (final IOException swallow) { - } - + clearState.set(true); } - @OnRemoved - public void onRemoved() { - final File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier()); - if (httpCache.exists()) { - httpCache.delete(); + @OnScheduled + public void onScheduled(final ProcessContext context) throws IOException { + if (clearState.getAndSet(false)) { + context.getStateManager().clear(Scope.LOCAL); } } @@ -444,8 +397,20 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { final HttpGet get = new HttpGet(url); get.setConfig(requestConfigBuilder.build()); - get.addHeader(HEADER_IF_MODIFIED_SINCE, lastModifiedRef.get()); - get.addHeader(HEADER_IF_NONE_MATCH, entityTagRef.get()); + try { + final StateMap stateMap = context.getStateManager().getState(Scope.LOCAL); + final String lastModified = stateMap.get(LAST_MODIFIED); + if (lastModified != null) { + get.addHeader(HEADER_IF_MODIFIED_SINCE, lastModified); + } + + final String etag = stateMap.get(ETAG); + if (etag != null) { + get.addHeader(HEADER_IF_NONE_MATCH, etag); + } + } catch (final IOException ioe) { + throw new ProcessException(ioe); + } final String accept = context.getProperty(ACCEPT_CONTENT_TYPE).getValue(); if (accept != null) { @@ -492,41 +457,20 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { session.transfer(flowFile, REL_SUCCESS); logger.info("Successfully received {} from {} at a rate of {}; transferred to success", new Object[]{flowFile, url, dataRate}); session.commit(); + + final Map<String, String> updatedState = new HashMap<>(2); final Header lastModified = response.getFirstHeader(HEADER_LAST_MODIFIED); if (lastModified != null) { - lastModifiedRef.set(lastModified.getValue()); + updatedState.put(LAST_MODIFIED, lastModified.getValue()); } final Header etag = response.getFirstHeader(HEADER_ETAG); if (etag != null) { - entityTagRef.set(etag.getValue()); + updatedState.put(ETAG, etag.getValue()); } - if ((etag != null || lastModified != null) && readLock.tryLock()) { - try { - if (timeToPersist < System.currentTimeMillis()) { - readLock.unlock(); - writeLock.lock(); - try { - if (timeToPersist < System.currentTimeMillis()) { - timeToPersist = System.currentTimeMillis() + PERSISTENCE_INTERVAL_MSEC; - final File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier()); - try (FileOutputStream fos = new FileOutputStream(httpCache)) { - final Properties props = new Properties(); - props.setProperty(ETAG, entityTagRef.get()); - props.setProperty(LAST_MODIFIED, lastModifiedRef.get()); - props.store(fos, "GetHTTP file modification values"); - } catch (final IOException e) { - getLogger().error("Failed to persist ETag and LastMod due to " + e, e); - } - } - } finally { - readLock.lock(); - writeLock.unlock(); - } - } - } finally { - readLock.unlock(); - } + + if (!updatedState.isEmpty()) { + context.getStateManager().setState(updatedState, Scope.LOCAL); } } catch (final IOException e) { context.yield(); http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java index ba84939..37316b6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java @@ -26,8 +26,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; @@ -35,6 +37,7 @@ import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.standard.util.ListableEntity; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.state.MockStateManager; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; @@ -96,7 +99,7 @@ public class TestAbstractListProcessor { } @Test - public void testStateStoredInDistributedService() throws InitializationException { + public void testStateStoredInClusterStateManagement() throws InitializationException { final ConcreteListProcessor proc = new ConcreteListProcessor(); final TestRunner runner = TestRunners.newTestRunner(proc); final DistributedCache cache = new DistributedCache(); @@ -109,7 +112,32 @@ public class TestAbstractListProcessor { proc.addEntity("name", "id", 1492L); runner.run(); - assertEquals(1, cache.stored.size()); + final Map<String, String> expectedState = new HashMap<>(); + expectedState.put(AbstractListProcessor.TIMESTAMP, "1492"); + expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", "id"); + runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); + } + + @Test + public void testStateMigrated() throws InitializationException { + final ConcreteListProcessor proc = new ConcreteListProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + final DistributedCache cache = new DistributedCache(); + runner.addControllerService("cache", cache); + runner.enableControllerService(cache); + runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache"); + + final String serviceState = "{\"latestTimestamp\":1492,\"matchingIdentifiers\":[\"id\"]}"; + final String cacheKey = runner.getProcessor().getIdentifier() + ".lastListingTime./path"; + cache.stored.put(cacheKey, serviceState); + + runner.run(); + + final MockStateManager stateManager = runner.getStateManager(); + final Map<String, String> expectedState = new HashMap<>(); + expectedState.put(AbstractListProcessor.TIMESTAMP, "1492"); + expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", "id"); + stateManager.assertStateEquals(expectedState, Scope.CLUSTER); } @Test @@ -174,7 +202,7 @@ public class TestAbstractListProcessor { @Override protected File getPersistenceFile() { - return new File("target/ListProcessor-local-state.json"); + return new File("target/ListProcessor-local-state-" + UUID.randomUUID().toString() + ".json"); } public void addEntity(final String name, final String identifier, final long timestamp) { http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java index 12a5cd4..e09d6f0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java @@ -16,16 +16,12 @@ */ package org.apache.nifi.processors.standard; -import java.io.File; import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.commons.lang3.SerializationException; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.distributed.cache.client.Deserializer; @@ -34,25 +30,21 @@ import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.state.MockStateManager; import org.apache.nifi.util.MockControllerServiceInitializationContext; import org.apache.nifi.util.MockProcessorLog; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TestDetectDuplicate { - private static Logger LOGGER; - static { System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.DetectDuplicate", "debug"); System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestDetectDuplicate", "debug"); - LOGGER = LoggerFactory.getLogger(TestDetectDuplicate.class); } @Test @@ -114,7 +106,7 @@ public class TestDetectDuplicate { final DistributedMapCacheClientImpl client = new DistributedMapCacheClientImpl(); final ComponentLog logger = new MockProcessorLog("client", client); - final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client", logger); + final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client", logger, new MockStateManager()); client.initialize(clientInitContext); return client; @@ -154,6 +146,7 @@ public class TestDetectDuplicate { } @Override + @SuppressWarnings("unchecked") public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer) throws IOException { if (exists) { @@ -183,33 +176,4 @@ public class TestDetectDuplicate { public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { } } - - private static class StringSerializer implements Serializer<String> { - - @Override - public void serialize(final String value, final OutputStream output) throws SerializationException, IOException { - output.write(value.getBytes(StandardCharsets.UTF_8)); - } - } - - private static void deleteRecursively(final File dataFile) throws IOException { - if (dataFile == null || !dataFile.exists()) { - return; - } - - final File[] children = dataFile.listFiles(); - for (final File child : children) { - if (child.isDirectory()) { - deleteRecursively(child); - } else { - for (int i = 0; i < 100 && child.exists(); i++) { - child.delete(); - } - - if (child.exists()) { - throw new IOException("Could not delete " + dataFile.getAbsolutePath()); - } - } - } - } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java index 5aec796..9c1d083 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java @@ -16,24 +16,18 @@ */ package org.apache.nifi.processors.standard; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.File; -import java.io.FileInputStream; import java.util.HashMap; import java.util.Map; -import java.util.Properties; +import org.apache.nifi.components.state.Scope; import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.StandardSSLContextService; import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.MockProcessContext; -import org.apache.nifi.util.MockProcessorInitializationContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.eclipse.jetty.servlet.ServletHandler; @@ -97,14 +91,14 @@ public class TestGetHTTP { controller.setProperty(GetHTTP.FILENAME, "testFile"); controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json"); - GetHTTP getHTTPProcessor = (GetHTTP) controller.getProcessor(); - assertEquals("", getHTTPProcessor.entityTagRef.get()); - assertEquals("Thu, 01 Jan 1970 00:00:00 GMT", getHTTPProcessor.lastModifiedRef.get()); + controller.getStateManager().assertStateNotSet(GetHTTP.ETAG, Scope.LOCAL); + controller.getStateManager().assertStateNotSet(GetHTTP.LAST_MODIFIED, Scope.LOCAL); controller.run(2); // verify the lastModified and entityTag are updated - assertFalse("".equals(getHTTPProcessor.entityTagRef.get())); - assertFalse("Thu, 01 Jan 1970 00:00:00 GMT".equals(getHTTPProcessor.lastModifiedRef.get())); + controller.getStateManager().assertStateNotEquals(GetHTTP.ETAG, "", Scope.LOCAL); + controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED, "Thu, 01 Jan 1970 00:00:00 GMT", Scope.LOCAL); + // ran twice, but got one...which is good controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1); @@ -141,18 +135,18 @@ public class TestGetHTTP { controller.run(2); // ran twice, got 1...but should have new cached etag controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1); - assertEquals("1", getHTTPProcessor.entityTagRef.get()); + controller.getStateManager().assertStateEquals(GetHTTP.ETAG, "1", Scope.LOCAL); controller.clearTransferState(); // turn off checking for Etag, turn on checking for lastModified, but change value RESTServiceContentModified.IGNORE_LAST_MODIFIED = false; RESTServiceContentModified.IGNORE_ETAG = true; RESTServiceContentModified.modificationDate = System.currentTimeMillis() / 1000 * 1000 + 5000; - String lastMod = getHTTPProcessor.lastModifiedRef.get(); + String lastMod = controller.getStateManager().getState(Scope.LOCAL).get(GetHTTP.LAST_MODIFIED); controller.run(2); // ran twice, got 1...but should have new cached etag controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1); - assertFalse(lastMod.equals(getHTTPProcessor.lastModifiedRef.get())); + controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED, lastMod, Scope.LOCAL); controller.clearTransferState(); // shutdown web service @@ -161,90 +155,6 @@ public class TestGetHTTP { } } - @Test - public void testPersistEtagLastMod() throws Exception { - // delete the config file - File confDir = new File("conf"); - File[] files = confDir.listFiles(); - for (File file : files) { - assertTrue("Failed to delete " + file.getName(), file.delete()); - } - - // set up web service - ServletHandler handler = new ServletHandler(); - handler.addServletWithMapping(RESTServiceContentModified.class, "/*"); - - // create the service - TestServer server = new TestServer(); - server.addHandler(handler); - - try { - server.startServer(); - - // get the server url - String destination = server.getUrl(); - - // set up NiFi mock controller - controller = TestRunners.newTestRunner(GetHTTP.class); - controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "5 secs"); - controller.setProperty(GetHTTP.FILENAME, "testFile"); - controller.setProperty(GetHTTP.URL, destination); - controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json"); - - GetHTTP getHTTPProcessor = (GetHTTP) controller.getProcessor(); - - assertEquals("", getHTTPProcessor.entityTagRef.get()); - assertEquals("Thu, 01 Jan 1970 00:00:00 GMT", getHTTPProcessor.lastModifiedRef.get()); - controller.run(2); - - // verify the lastModified and entityTag are updated - String etag = getHTTPProcessor.entityTagRef.get(); - assertFalse("".equals(etag)); - String lastMod = getHTTPProcessor.lastModifiedRef.get(); - assertFalse("Thu, 01 Jan 1970 00:00:00 GMT".equals(lastMod)); - // ran twice, but got one...which is good - controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1); - controller.clearTransferState(); - - files = confDir.listFiles(); - assertEquals(1, files.length); - File file = files[0]; - assertTrue(file.exists()); - Properties props = new Properties(); - FileInputStream fis = new FileInputStream(file); - props.load(fis); - fis.close(); - assertEquals(etag, props.getProperty(GetHTTP.ETAG)); - assertEquals(lastMod, props.getProperty(GetHTTP.LAST_MODIFIED)); - - ProcessorInitializationContext pic = new MockProcessorInitializationContext(controller.getProcessor(), (MockProcessContext) controller.getProcessContext()); - // init causes read from file - getHTTPProcessor.init(pic); - assertEquals(etag, getHTTPProcessor.entityTagRef.get()); - assertEquals(lastMod, getHTTPProcessor.lastModifiedRef.get()); - controller.run(2); - // ran twice, got none...which is good - controller.assertTransferCount(GetHTTP.REL_SUCCESS, 0); - controller.clearTransferState(); - files = confDir.listFiles(); - assertEquals(1, files.length); - file = files[0]; - assertTrue(file.exists()); - props = new Properties(); - fis = new FileInputStream(file); - props.load(fis); - fis.close(); - assertEquals(etag, props.getProperty(GetHTTP.ETAG)); - assertEquals(lastMod, props.getProperty(GetHTTP.LAST_MODIFIED)); - - getHTTPProcessor.onRemoved(); - assertFalse(file.exists()); - - // shutdown web service - } finally { - server.shutdownServer(); - } - } @Test public final void testUserAgent() throws Exception { http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteOnAttribute.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteOnAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteOnAttribute.java index 2eac3f2..3b70dc4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteOnAttribute.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteOnAttribute.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.state.MockStateManager; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.TestRunner; @@ -38,7 +39,7 @@ public class TestRouteOnAttribute { @Test public void testInvalidOnMisconfiguredProperty() { final RouteOnAttribute proc = new RouteOnAttribute(); - final MockProcessContext ctx = new MockProcessContext(proc); + final MockProcessContext ctx = new MockProcessContext(proc, new MockStateManager()); final ValidationResult validationResult = ctx.setProperty("RouteA", "${a:equals('b')"); // Missing closing brace assertFalse(validationResult.isValid()); } @@ -46,7 +47,7 @@ public class TestRouteOnAttribute { @Test public void testInvalidOnNonBooleanProperty() { final RouteOnAttribute proc = new RouteOnAttribute(); - final MockProcessContext ctx = new MockProcessContext(proc); + final MockProcessContext ctx = new MockProcessContext(proc, new MockStateManager()); final ValidationResult validationResult = ctx.setProperty("RouteA", "${a:length()"); // Should be boolean assertFalse(validationResult.isValid()); } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 2af004d..954733f 100644 --- a/pom.xml +++ b/pom.xml @@ -102,7 +102,8 @@ language governing permissions and limitations under the License. --> <repositories> <repository> <id>central</id> - <!-- This should be at top, it makes maven try the central repo first and then others and hence faster dep resolution --> + <!-- This should be at top, it makes maven try the central repo + first and then others and hence faster dep resolution --> <name>Maven Repository</name> <url>https://repo1.maven.org/maven2</url> <releases> @@ -578,7 +579,7 @@ language governing permissions and limitations under the License. --> </dependency> <dependency> <groupId>org.apache.activemq</groupId> - <artifactId>activemq-broker</artifactId> + <artifactId>activemq-broker</artifactId> <version>5.12.1</version> <scope>tests</scope> </dependency> @@ -749,6 +750,27 @@ language governing permissions and limitations under the License. --> <version>1.3.1</version> </dependency> <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>3.4.7</version> + </dependency> + + <!-- Test Dependencies for testing interactions with ZooKeeper --> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <version>3.0.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + <version>6.8.8</version> + <scope>test</scope> + </dependency> + + <!-- NiFi modules --> + <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-api</artifactId> <version>0.4.2-SNAPSHOT</version> @@ -1283,71 +1305,100 @@ language governing permissions and limitations under the License. --> <!-- Checks for whitespace --> <!-- See http://checkstyle.sf.net/config_whitespace.html --> <module name="FileTabCharacter"> - <property name="eachLine" value="true" /> + <property name="eachLine" + value="true" /> </module> <module name="TreeWalker"> <module name="RegexpSinglelineJava"> - <property name="format" value="\s+$" /> - <property name="message" value="Line has trailing whitespace." /> + <property name="format" + value="\s+$" /> + <property name="message" + value="Line has trailing whitespace." /> </module> <module name="RegexpSinglelineJava"> - <property name="format" value="[@]see\s+[{][@]link" /> - <property name="message" value="Javadoc @see does not need @link: pick one or the other." /> + <property name="format" + value="[@]see\s+[{][@]link" /> + <property name="message" + value="Javadoc @see does not need @link: pick one or the other." /> </module> <module name="OuterTypeFilename" /> <module name="LineLength"> <!-- needs extra, because Eclipse formatter ignores the ending left brace --> <property name="max" value="200" /> - <property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://" /> + <property name="ignorePattern" + value="^package.*|^import.*|a href|href|http://|https://|ftp://" /> </module> <module name="AvoidStarImport" /> <module name="UnusedImports"> - <property name="processJavadoc" value="true" /> + <property name="processJavadoc" + value="true" /> </module> <module name="NoLineWrap" /> <module name="LeftCurly"> - <property name="maxLineLength" value="160" /> + <property name="maxLineLength" + value="160" /> </module> <module name="RightCurly" /> <module name="RightCurly"> - <property name="option" value="alone" /> - <property name="tokens" value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO, STATIC_INIT, INSTANCE_INIT" /> + <property name="option" + value="alone" /> + <property name="tokens" + value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO, STATIC_INIT, INSTANCE_INIT" /> </module> <module name="SeparatorWrap"> - <property name="tokens" value="DOT" /> - <property name="option" value="nl" /> + <property name="tokens" + value="DOT" /> + <property name="option" + value="nl" /> </module> <module name="SeparatorWrap"> - <property name="tokens" value="COMMA" /> - <property name="option" value="EOL" /> + <property name="tokens" + value="COMMA" /> + <property name="option" + value="EOL" /> </module> <module name="PackageName"> - <property name="format" value="^[a-z]+(\.[a-z][a-zA-Z0-9]*)*$" /> + <property name="format" + value="^[a-z]+(\.[a-z][a-zA-Z0-9]*)*$" /> </module> <module name="MethodTypeParameterName"> - <property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)" /> + <property name="format" + value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)" /> </module> <module name="MethodParamPad" /> <module name="OperatorWrap"> - <property name="option" value="NL" /> - <property name="tokens" value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, LT, MINUS, MOD, NOT_EQUAL, QUESTION, SL, SR, STAR " /> + <property name="option" + value="NL" /> + <property name="tokens" + value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, LT, MINUS, MOD, NOT_EQUAL, QUESTION, SL, SR, STAR " /> </module> <module name="AnnotationLocation"> - <property name="tokens" value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF" /> + <property name="tokens" + value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF" /> </module> <module name="AnnotationLocation"> - <property name="tokens" value="VARIABLE_DEF" /> - <property name="allowSamelineMultipleAnnotations" value="true" /> + <property name="tokens" + value="VARIABLE_DEF" /> + <property + name="allowSamelineMultipleAnnotations" + value="true" /> </module> <module name="NonEmptyAtclauseDescription" /> <module name="JavadocMethod"> - <property name="allowMissingJavadoc" value="true" /> - <property name="allowMissingParamTags" value="true" /> - <property name="allowMissingThrowsTags" value="true" /> - <property name="allowMissingReturnTag" value="true" /> - <property name="allowedAnnotations" value="Override,Test,BeforeClass,AfterClass,Before,After" /> - <property name="allowThrowsTagsForSubclasses" value="true" /> + <property name="allowMissingJavadoc" + value="true" /> + <property name="allowMissingParamTags" + value="true" /> + <property name="allowMissingThrowsTags" + value="true" /> + <property name="allowMissingReturnTag" + value="true" /> + <property name="allowedAnnotations" + value="Override,Test,BeforeClass,AfterClass,Before,After" /> + <property + name="allowThrowsTagsForSubclasses" + value="true" /> </module> <module name="SingleLineJavadoc" /> </module> @@ -1424,12 +1475,12 @@ language governing permissions and limitations under the License. --> </build> </profile> <profile> - <!-- This profile will disable DocLint which performs strict JavaDoc - processing which was introduced in JDK 8. These are technically errors - in the JavaDoc which we need to eventually address. However, if a release - is performed using JDK 8, the JavaDoc generation would fail. By activating - this profile when running on JDK 8 we can ensure the JavaDocs continue - to generate successfully --> + <!-- This profile will disable DocLint which performs strict + JavaDoc processing which was introduced in JDK 8. These are technically errors + in the JavaDoc which we need to eventually address. However, if a release + is performed using JDK 8, the JavaDoc generation would fail. By activating + this profile when running on JDK 8 we can ensure the JavaDocs continue to + generate successfully --> <id>disable-doclint</id> <activation> <jdk>1.8</jdk>
