http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java index 0000000,4fae6ed..4a74416 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java @@@ -1,0 -1,206 +1,208 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + -import org.apache.nifi.processors.standard.DetectDuplicate; + import java.io.File; + import java.io.IOException; + import java.io.OutputStream; + import java.nio.charset.StandardCharsets; + import java.util.ArrayList; + import java.util.HashMap; + import java.util.List; + import java.util.Map; + ++import org.apache.commons.lang3.SerializationException; + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.controller.AbstractControllerService; + import org.apache.nifi.distributed.cache.client.Deserializer; + import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; + import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService; + import org.apache.nifi.distributed.cache.client.Serializer; + import org.apache.nifi.reporting.InitializationException; + import org.apache.nifi.util.MockControllerServiceInitializationContext; + import org.apache.nifi.util.TestRunner; + import org.apache.nifi.util.TestRunners; - -import org.apache.commons.lang3.SerializationException; + import org.junit.Test; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + public class TestDetectDuplicate { + + private static Logger LOGGER; + + static { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.DetectDuplicate", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestDetectDuplicate", "debug"); + LOGGER = LoggerFactory.getLogger(TestListenUDP.class); + } + + @Test + public void testDuplicate() throws InitializationException { - + TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class); + final DistributedMapCacheClientImpl client = createClient(); + final Map<String, String> clientProperties = new HashMap<>(); + clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost"); + runner.addControllerService("client", client, clientProperties); + runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client"); + runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file"); + runner.setProperty(DetectDuplicate.AGE_OFF_DURATION, "48 hours"); + Map<String, String> props = new HashMap<>(); + props.put("hash.value", "1000"); + runner.enqueue(new byte[]{}, props); ++ runner.enableControllerService(client); ++ + runner.run(); + runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1); + runner.clearTransferState(); + client.exists = true; + runner.enqueue(new byte[]{}, props); + runner.run(); + runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_DUPLICATE, 1); + runner.assertTransferCount(DetectDuplicate.REL_NON_DUPLICATE, 0); + runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0); + } + + @Test + public void testDuplicateWithAgeOff() throws InitializationException, InterruptedException { + + TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class); + final DistributedMapCacheClientImpl client = createClient(); + final Map<String, String> clientProperties = new HashMap<>(); + clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost"); + runner.addControllerService("client", client, clientProperties); + runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client"); + runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file"); + runner.setProperty(DetectDuplicate.AGE_OFF_DURATION, "2 secs"); ++ runner.enableControllerService(client); ++ + Map<String, String> props = new HashMap<>(); + props.put("hash.value", "1000"); + runner.enqueue(new byte[]{}, props); ++ + runner.run(); + runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1); + runner.clearTransferState(); + client.exists = true; + Thread.sleep(3000); + runner.enqueue(new byte[]{}, props); + runner.run(); + runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1); + runner.assertTransferCount(DetectDuplicate.REL_DUPLICATE, 0); + runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0); + } + + private DistributedMapCacheClientImpl createClient() throws InitializationException { + + final DistributedMapCacheClientImpl client = new DistributedMapCacheClientImpl(); + MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client"); + client.initialize(clientInitContext); + + return client; + } + + static final class DistributedMapCacheClientImpl extends AbstractControllerService implements DistributedMapCacheClient { + + boolean exists = false; + private Object cacheValue; + + @Override + public void close() throws IOException { + } + + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + } + + @Override + protected java.util.List<PropertyDescriptor> getSupportedPropertyDescriptors() { + List<PropertyDescriptor> props = new ArrayList<>(); + props.add(DistributedMapCacheClientService.HOSTNAME); + props.add(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT); + props.add(DistributedMapCacheClientService.PORT); + props.add(DistributedMapCacheClientService.SSL_CONTEXT_SERVICE); + return props; + } + + @Override + public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException { + if (exists) { + return false; + } + + cacheValue = value; + return true; + } + + @Override + public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, + Deserializer<V> valueDeserializer) throws IOException { + if (exists) { + return (V) cacheValue; + } + cacheValue = value; + return null; + } + + @Override + public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException { + return exists; + } + + @Override + public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException { + return null; + } + + @Override + public <K> boolean remove(K key, Serializer<K> serializer) throws IOException { + exists = false; + return true; + } + } + + private static class StringSerializer implements Serializer<String> { + + @Override + public void serialize(final String value, final OutputStream output) throws SerializationException, IOException { + output.write(value.getBytes(StandardCharsets.UTF_8)); + } + } + + private static void deleteRecursively(final File dataFile) throws IOException { + if (dataFile == null || !dataFile.exists()) { + return; + } + + final File[] children = dataFile.listFiles(); + for (final File child : children) { + if (child.isDirectory()) { + deleteRecursively(child); + } else { + for (int i = 0; i < 100 && child.exists(); i++) { + child.delete(); + } + + if (child.exists()) { + throw new IOException("Could not delete " + dataFile.getAbsolutePath()); + } + } + } + } + }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java index 0000000,63bdcf8..f52b212 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java @@@ -1,0 -1,354 +1,340 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import static org.junit.Assert.assertEquals; + import static org.junit.Assert.assertFalse; + import static org.junit.Assert.assertTrue; + + import java.io.File; + import java.io.FileInputStream; + import java.util.HashMap; + import java.util.Map; + import java.util.Properties; + + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.reporting.InitializationException; + import org.apache.nifi.ssl.SSLContextService; + import org.apache.nifi.ssl.StandardSSLContextService; -import org.apache.nifi.util.MockControllerServiceInitializationContext; + import org.apache.nifi.util.MockFlowFile; + import org.apache.nifi.util.MockProcessContext; + import org.apache.nifi.util.MockProcessorInitializationContext; + import org.apache.nifi.util.TestRunner; + import org.apache.nifi.util.TestRunners; + import org.eclipse.jetty.servlet.ServletHandler; + import org.junit.AfterClass; + import org.junit.Assert; + import org.junit.BeforeClass; + import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + + /** + * @author unattributed + * + */ + public class TestGetHTTP { + - private static Logger LOGGER; + private TestRunner controller; + + @BeforeClass + public static void before() { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.GetHTTP", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestGetHTTP", "debug"); - LOGGER = LoggerFactory.getLogger(TestGetHTTP.class); + File confDir = new File("conf"); + if (!confDir.exists()) { + confDir.mkdir(); + } + } + + @AfterClass + public static void after() { + File confDir = new File("conf"); + assertTrue(confDir.exists()); + File[] files = confDir.listFiles(); + assertTrue(files.length > 0); + for (File file : files) { + assertTrue("Failed to delete " + file.getName(), file.delete()); + } + assertTrue(confDir.delete()); + } + - private static Map<String, String> createSslProperties() { - Map<String, String> map = new HashMap<String, String>(); - map.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); - map.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); - map.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); - map.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); - map.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); - map.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); - return map; - } + + @Test + public final void testContentModified() throws Exception { + // set up web service + ServletHandler handler = new ServletHandler(); + handler.addServletWithMapping(RESTServiceContentModified.class, "/*"); + + // create the service + TestServer server = new TestServer(); + server.addHandler(handler); + + try { + server.startServer(); + + // this is the base url with the random port + String destination = server.getUrl(); + + // set up NiFi mock controller + controller = TestRunners.newTestRunner(GetHTTP.class); + controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "5 secs"); + controller.setProperty(GetHTTP.URL, destination); + controller.setProperty(GetHTTP.FILENAME, "testFile"); + controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json"); + + GetHTTP getHTTPProcessor = (GetHTTP) controller.getProcessor(); + assertEquals("", getHTTPProcessor.entityTagRef.get()); + assertEquals("Thu, 01 Jan 1970 00:00:00 GMT", getHTTPProcessor.lastModifiedRef.get()); + controller.run(2); + + // verify the lastModified and entityTag are updated + assertFalse("".equals(getHTTPProcessor.entityTagRef.get())); + assertFalse("Thu, 01 Jan 1970 00:00:00 GMT".equals(getHTTPProcessor.lastModifiedRef.get())); + // ran twice, but got one...which is good + controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1); + + // verify remote.source flowfile attribute + controller.getFlowFilesForRelationship(GetHTTP.REL_SUCCESS).get(0).assertAttributeEquals("gethttp.remote.source", "localhost"); + + controller.clearTransferState(); + + // turn off checking for etag and lastModified + RESTServiceContentModified.IGNORE_ETAG = true; + RESTServiceContentModified.IGNORE_LAST_MODIFIED = true; + controller.run(2); + // ran twice, got two...which is good + controller.assertTransferCount(GetHTTP.REL_SUCCESS, 2); + controller.clearTransferState(); + + // turn on checking for etag + RESTServiceContentModified.IGNORE_ETAG = false; + controller.run(2); + // ran twice, got 0...which is good + controller.assertTransferCount(GetHTTP.REL_SUCCESS, 0); + + // turn on checking for lastModified, but off for etag + RESTServiceContentModified.IGNORE_LAST_MODIFIED = false; + RESTServiceContentModified.IGNORE_ETAG = true; + controller.run(2); + // ran twice, got 0...which is good + controller.assertTransferCount(GetHTTP.REL_SUCCESS, 0); + + // turn off checking for lastModified, turn on checking for etag, but change the value + RESTServiceContentModified.IGNORE_LAST_MODIFIED = true; + RESTServiceContentModified.IGNORE_ETAG = false; + RESTServiceContentModified.ETAG = 1; + controller.run(2); + // ran twice, got 1...but should have new cached etag + controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1); + assertEquals("1", getHTTPProcessor.entityTagRef.get()); + controller.clearTransferState(); + + // turn off checking for Etag, turn on checking for lastModified, but change value + RESTServiceContentModified.IGNORE_LAST_MODIFIED = false; + RESTServiceContentModified.IGNORE_ETAG = true; + RESTServiceContentModified.modificationDate = System.currentTimeMillis() / 1000 * 1000 + 5000; + String lastMod = getHTTPProcessor.lastModifiedRef.get(); + controller.run(2); + // ran twice, got 1...but should have new cached etag + controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1); + assertFalse(lastMod.equals(getHTTPProcessor.lastModifiedRef.get())); + controller.clearTransferState(); + + // shutdown web service + } finally { + server.shutdownServer(); + } + } + + @Test + public void testPersistEtagLastMod() throws Exception { + // delete the config file + File confDir = new File("conf"); + File[] files = confDir.listFiles(); + for (File file : files) { + assertTrue("Failed to delete " + file.getName(), file.delete()); + } + + // set up web service + ServletHandler handler = new ServletHandler(); + handler.addServletWithMapping(RESTServiceContentModified.class, "/*"); + + // create the service + TestServer server = new TestServer(); + server.addHandler(handler); + + try { + server.startServer(); + + // get the server url + String destination = server.getUrl(); + + // set up NiFi mock controller + controller = TestRunners.newTestRunner(GetHTTP.class); + controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "5 secs"); + controller.setProperty(GetHTTP.FILENAME, "testFile"); + controller.setProperty(GetHTTP.URL, destination); + controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json"); + + GetHTTP getHTTPProcessor = (GetHTTP) controller.getProcessor(); + + assertEquals("", getHTTPProcessor.entityTagRef.get()); + assertEquals("Thu, 01 Jan 1970 00:00:00 GMT", getHTTPProcessor.lastModifiedRef.get()); + controller.run(2); + + // verify the lastModified and entityTag are updated + String etag = getHTTPProcessor.entityTagRef.get(); + assertFalse("".equals(etag)); + String lastMod = getHTTPProcessor.lastModifiedRef.get(); + assertFalse("Thu, 01 Jan 1970 00:00:00 GMT".equals(lastMod)); + // ran twice, but got one...which is good + controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1); + controller.clearTransferState(); + + files = confDir.listFiles(); + assertEquals(1, files.length); + File file = files[0]; + assertTrue(file.exists()); + Properties props = new Properties(); + FileInputStream fis = new FileInputStream(file); + props.load(fis); + fis.close(); + assertEquals(etag, props.getProperty(GetHTTP.ETAG)); + assertEquals(lastMod, props.getProperty(GetHTTP.LAST_MODIFIED)); + + ProcessorInitializationContext pic = new MockProcessorInitializationContext(controller.getProcessor(), + (MockProcessContext) controller.getProcessContext()); + // init causes read from file + getHTTPProcessor.init(pic); + assertEquals(etag, getHTTPProcessor.entityTagRef.get()); + assertEquals(lastMod, getHTTPProcessor.lastModifiedRef.get()); + controller.run(2); + // ran twice, got none...which is good + controller.assertTransferCount(GetHTTP.REL_SUCCESS, 0); + controller.clearTransferState(); + files = confDir.listFiles(); + assertEquals(1, files.length); + file = files[0]; + assertTrue(file.exists()); + props = new Properties(); + fis = new FileInputStream(file); + props.load(fis); + fis.close(); + assertEquals(etag, props.getProperty(GetHTTP.ETAG)); + assertEquals(lastMod, props.getProperty(GetHTTP.LAST_MODIFIED)); + + // shutdown web service + } finally { + server.shutdownServer(); + } + } + + @Test + public final void testUserAgent() throws Exception { + // set up web service + ServletHandler handler = new ServletHandler(); + handler.addServletWithMapping(UserAgentTestingServlet.class, "/*"); + + // create the service + TestServer server = new TestServer(); + server.addHandler(handler); + + try { + server.startServer(); + + String destination = server.getUrl(); + + // set up NiFi mock controller + controller = TestRunners.newTestRunner(GetHTTP.class); + controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "5 secs"); + controller.setProperty(GetHTTP.URL, destination); + controller.setProperty(GetHTTP.FILENAME, "testFile"); + controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json"); + + controller.run(); + controller.assertTransferCount(GetHTTP.REL_SUCCESS, 0); + + controller.setProperty(GetHTTP.USER_AGENT, "testUserAgent"); + controller.run(); + controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1); + + // shutdown web service + } finally { + server.shutdownServer(); + } + } + + private Map<String, String> getSslProperties() { + Map<String, String> props = new HashMap<String, String>(); + props.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); + props.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); + props.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); + props.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); + props.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); + props.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); + return props; + } + + private void useSSLContextService() { + final SSLContextService service = new StandardSSLContextService(); + try { + controller.addControllerService("ssl-service", service, getSslProperties()); ++ controller.enableControllerService(service); + } catch (InitializationException ex) { + ex.printStackTrace(); + Assert.fail("Could not create SSL Context Service"); + } + + controller.setProperty(GetHTTP.SSL_CONTEXT_SERVICE, "ssl-service"); + } + + @Test + public final void testSecure() throws Exception { + // set up web service + ServletHandler handler = new ServletHandler(); + handler.addServletWithMapping(HelloWorldServlet.class, "/*"); + + // create the service + TestServer server = new TestServer(getSslProperties()); + server.addHandler(handler); + + try { + server.startServer(); + + String destination = server.getSecureUrl(); + + // set up NiFi mock controller + controller = TestRunners.newTestRunner(GetHTTP.class); + useSSLContextService(); + + controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "5 secs"); + controller.setProperty(GetHTTP.URL, destination); + controller.setProperty(GetHTTP.FILENAME, "testFile"); + controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json"); + + controller.run(); + controller.assertAllFlowFilesTransferred(GetHTTP.REL_SUCCESS, 1); + final MockFlowFile mff = controller.getFlowFilesForRelationship(GetHTTP.REL_SUCCESS).get(0); + mff.assertContentEquals("Hello, World!"); + } finally { + server.shutdownServer(); + } + } + + + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java index 0000000,b98ba13..e5950cd mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java @@@ -1,0 -1,593 +1,595 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + + import static org.junit.Assert.*; + + import java.io.IOException; + import java.io.PrintWriter; + import java.io.UnsupportedEncodingException; + import java.nio.charset.StandardCharsets; + import java.text.SimpleDateFormat; + import java.util.Date; + import java.util.HashMap; + import java.util.Locale; + import java.util.Map; + + import javax.servlet.ServletException; + import javax.servlet.http.HttpServletRequest; + import javax.servlet.http.HttpServletResponse; + + import org.apache.nifi.flowfile.attributes.CoreAttributes; + import org.apache.nifi.processors.standard.InvokeHTTP.Config; + import org.apache.nifi.ssl.StandardSSLContextService; + import org.apache.nifi.util.MockFlowFile; + import org.apache.nifi.util.TestRunner; + import org.apache.nifi.util.TestRunners; + import org.eclipse.jetty.server.Handler; + import org.eclipse.jetty.server.Request; + import org.eclipse.jetty.server.handler.AbstractHandler; + import org.junit.After; + import org.junit.AfterClass; + import org.junit.Assert; + import org.junit.Before; + import org.junit.BeforeClass; + import org.junit.Test; + + public class TestInvokeHTTP { + + private static Map<String, String> sslProperties; + private static TestServer server; + private static String url; + + private TestRunner runner; + + @BeforeClass + public static void beforeClass() throws Exception { + // useful for verbose logging output + // don't commit this with this property enabled, or any 'mvn test' will be really verbose + // System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard", "debug"); + + // create the SSL properties, which basically store keystore / trustore information + // this is used by the StandardSSLContextService and the Jetty Server + sslProperties = createSslProperties(); + + // create a Jetty server on a random port + server = createServer(); + server.startServer(); + + // this is the base url with the random port + url = server.getSecureUrl(); + } + + @AfterClass + public static void afterClass() throws Exception { + server.shutdownServer(); + } + + @Before + public void before() throws Exception { + runner = TestRunners.newTestRunner(InvokeHTTP.class); - runner.addControllerService("ssl-context", new StandardSSLContextService(), sslProperties); ++ final StandardSSLContextService sslService = new StandardSSLContextService(); ++ runner.addControllerService("ssl-context", sslService, sslProperties); ++ runner.enableControllerService(sslService); + runner.setProperty(Config.PROP_SSL_CONTEXT_SERVICE, "ssl-context"); + + server.clearHandlers(); + } + + @After + public void after() { + runner.shutdown(); + } + + private void addHandler(Handler handler) { + server.addHandler(handler); + } + + @Test + public void testDateGeneration() throws Exception { + DateHandler dh = new DateHandler(); + addHandler(dh); + + runner.setProperty(Config.PROP_URL, url); + createFlowFiles(runner); + runner.run(); + + // extract the date string sent to the server + // and store it as a java.util.Date + SimpleDateFormat sdf = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z", Locale.US); + Date date = sdf.parse(dh.dateString); + + // calculate the difference between the date string sent by the client and + // the current system time -- these should be within a second or two + // (just enough time to run the test). + // + // If the difference is more like in hours, it's likely that a timezone + // conversion caused a problem. + long diff = Math.abs(System.currentTimeMillis() - date.getTime()); + long threshold = 15000; // 15 seconds + if (diff > threshold) { + fail("Difference (" + diff + ") was greater than threshold (" + threshold + ")"); + } + System.out.println("diff: " + diff); + } + + @Test + public void test200() throws Exception { + addHandler(new GetOrHeadHandler()); + + runner.setProperty(Config.PROP_URL, url + "/status/200"); + + createFlowFiles(runner); + + runner.run(); + + runner.assertTransferCount(Config.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(Config.REL_SUCCESS_RESP, 1); + runner.assertTransferCount(Config.REL_RETRY, 0); + runner.assertTransferCount(Config.REL_NO_RETRY, 0); + runner.assertTransferCount(Config.REL_FAILURE, 0); + + //expected in request status.code and status.message + //original flow file (+attributes)?????????? + final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_REQ).get(0); + bundle.assertAttributeEquals(Config.STATUS_CODE, "200"); + bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "OK"); + final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); + final String expected = "Hello"; + Assert.assertEquals(expected, actual); + bundle.assertAttributeEquals("Foo", "Bar"); + + //expected in response + //status code, status message, all headers from server response --> ff attributes + //server response message body into payload of ff + //should not contain any original ff attributes + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_RESP).get(0); + bundle1.assertContentEquals("/status/200".getBytes("UTF-8")); + bundle1.assertAttributeEquals(Config.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(Config.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals("Foo", "Bar"); + bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1"); + final String actual1 = new String(bundle1.toByteArray(), StandardCharsets.UTF_8); + final String expected1 = "/status/200"; + Assert.assertEquals(expected1, actual1); + + } + + @Test + public void test500() throws Exception { + addHandler(new GetOrHeadHandler()); + + runner.setProperty(Config.PROP_URL, url + "/status/500"); + + createFlowFiles(runner); + + runner.run(); + runner.assertTransferCount(Config.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(Config.REL_SUCCESS_RESP, 0); + runner.assertTransferCount(Config.REL_RETRY, 1); + runner.assertTransferCount(Config.REL_NO_RETRY, 0); + runner.assertTransferCount(Config.REL_FAILURE, 0); + + //expected in response + final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_RETRY).get(0); + final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); + bundle.assertAttributeEquals(Config.STATUS_CODE, "500"); + bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "Server Error"); + bundle.assertAttributeEquals(Config.RESPONSE_BODY, "/status/500"); + + final String expected = "Hello"; + Assert.assertEquals(expected, actual); + bundle.assertAttributeEquals("Foo", "Bar"); + + } + + @Test + public void test300() throws Exception { + addHandler(new GetOrHeadHandler()); + + runner.setProperty(Config.PROP_URL, url + "/status/302"); + + createFlowFiles(runner); + + runner.run(); + runner.assertTransferCount(Config.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(Config.REL_SUCCESS_RESP, 0); + runner.assertTransferCount(Config.REL_RETRY, 0); + runner.assertTransferCount(Config.REL_NO_RETRY, 1); + runner.assertTransferCount(Config.REL_FAILURE, 0); + //getMyFlowFiles(); + //expected in response + final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0); + final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); + + bundle.assertAttributeEquals(Config.STATUS_CODE, "302"); + bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "Found"); + final String expected = "Hello"; + Assert.assertEquals(expected, actual); + bundle.assertAttributeEquals("Foo", "Bar"); + + } + + @Test + public void test304() throws Exception { + addHandler(new GetOrHeadHandler()); + + runner.setProperty(Config.PROP_URL, url + "/status/304"); + + createFlowFiles(runner); + + runner.run(); + runner.assertTransferCount(Config.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(Config.REL_SUCCESS_RESP, 0); + runner.assertTransferCount(Config.REL_RETRY, 0); + runner.assertTransferCount(Config.REL_NO_RETRY, 1); + runner.assertTransferCount(Config.REL_FAILURE, 0); + //getMyFlowFiles(); + //expected in response + final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0); + final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); + + bundle.assertAttributeEquals(Config.STATUS_CODE, "304"); + bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "Not Modified"); + final String expected = "Hello"; + Assert.assertEquals(expected, actual); + bundle.assertAttributeEquals("Foo", "Bar"); + + } + + @Test + public void test400() throws Exception { + addHandler(new GetOrHeadHandler()); + + runner.setProperty(Config.PROP_URL, url + "/status/400"); + + createFlowFiles(runner); + + runner.run(); + runner.assertTransferCount(Config.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(Config.REL_SUCCESS_RESP, 0); + runner.assertTransferCount(Config.REL_RETRY, 0); + runner.assertTransferCount(Config.REL_NO_RETRY, 1); + runner.assertTransferCount(Config.REL_FAILURE, 0); + //getMyFlowFiles(); + //expected in response + final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0); + final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); + + bundle.assertAttributeEquals(Config.STATUS_CODE, "400"); + bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "Bad Request"); + bundle.assertAttributeEquals(Config.RESPONSE_BODY, "/status/400"); + final String expected = "Hello"; + Assert.assertEquals(expected, actual); + bundle.assertAttributeEquals("Foo", "Bar"); + + } + + @Test + public void test412() throws Exception { + addHandler(new GetOrHeadHandler()); + + runner.setProperty(Config.PROP_URL, url + "/status/412"); + runner.setProperty(Config.PROP_METHOD, "GET"); + + createFlowFiles(runner); + + runner.run(); + runner.assertTransferCount(Config.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(Config.REL_SUCCESS_RESP, 0); + runner.assertTransferCount(Config.REL_RETRY, 0); + runner.assertTransferCount(Config.REL_NO_RETRY, 1); + runner.assertTransferCount(Config.REL_FAILURE, 0); + + //expected in response + final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0); + final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); + + bundle.assertAttributeEquals(Config.STATUS_CODE, "412"); + bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "Precondition Failed"); + bundle.assertAttributeEquals(Config.RESPONSE_BODY, "/status/412"); + final String expected = "Hello"; + Assert.assertEquals(expected, actual); + bundle.assertAttributeEquals("Foo", "Bar"); + + } + + @Test + public void testHead() throws Exception { + addHandler(new GetOrHeadHandler()); + + runner.setProperty(Config.PROP_METHOD, "HEAD"); + runner.setProperty(Config.PROP_URL, url + "/status/200"); + + createFlowFiles(runner); + + runner.run(); + runner.assertTransferCount(Config.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(Config.REL_SUCCESS_RESP, 1); + runner.assertTransferCount(Config.REL_RETRY, 0); + runner.assertTransferCount(Config.REL_NO_RETRY, 0); + runner.assertTransferCount(Config.REL_FAILURE, 0); + + final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_REQ).get(0); + bundle.assertAttributeEquals(Config.STATUS_CODE, "200"); + bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "OK"); + final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); + final String expected = "Hello"; + Assert.assertEquals(expected, actual); + bundle.assertAttributeEquals("Foo", "Bar"); + + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_RESP).get(0); + bundle1.assertContentEquals("".getBytes("UTF-8")); + bundle1.assertAttributeEquals(Config.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(Config.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals("Foo", "Bar"); + bundle1.assertAttributeEquals("Content-Type", "text/plain"); + final String actual1 = new String(bundle1.toByteArray(), StandardCharsets.UTF_8); + final String expected1 = ""; + Assert.assertEquals(expected1, actual1); + } + + @Test + public void testPost() throws Exception { + addHandler(new PostHandler()); + + runner.setProperty(Config.PROP_METHOD, "POST"); + runner.setProperty(Config.PROP_URL, url + "/post"); + + createFlowFiles(runner); + + runner.run(); + runner.assertTransferCount(Config.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(Config.REL_SUCCESS_RESP, 1); + runner.assertTransferCount(Config.REL_RETRY, 0); + runner.assertTransferCount(Config.REL_NO_RETRY, 0); + runner.assertTransferCount(Config.REL_FAILURE, 0); + + final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_REQ).get(0); + bundle.assertAttributeEquals(Config.STATUS_CODE, "200"); + bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "OK"); + final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); + final String expected = "Hello"; + Assert.assertEquals(expected, actual); + bundle.assertAttributeEquals("Foo", "Bar"); + + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_RESP).get(0); + bundle1.assertContentEquals("".getBytes("UTF-8")); + bundle1.assertAttributeEquals(Config.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(Config.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals("Foo", "Bar"); + bundle1.assertAttributeNotExists("Content-Type"); + + final String actual1 = new String(bundle1.toByteArray(), StandardCharsets.UTF_8); + final String expected1 = ""; + Assert.assertEquals(expected1, actual1); + } + + @Test + public void testPut() throws Exception { + addHandler(new PostHandler()); + + runner.setProperty(Config.PROP_METHOD, "PUT"); + runner.setProperty(Config.PROP_URL, url + "/post"); + + createFlowFiles(runner); + + runner.run(); + runner.assertTransferCount(Config.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(Config.REL_SUCCESS_RESP, 1); + runner.assertTransferCount(Config.REL_RETRY, 0); + runner.assertTransferCount(Config.REL_NO_RETRY, 0); + runner.assertTransferCount(Config.REL_FAILURE, 0); + + final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_REQ).get(0); + bundle.assertAttributeEquals(Config.STATUS_CODE, "200"); + bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "OK"); + final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); + final String expected = "Hello"; + Assert.assertEquals(expected, actual); + bundle.assertAttributeEquals("Foo", "Bar"); + + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_RESP).get(0); + bundle1.assertContentEquals("".getBytes("UTF-8")); + bundle1.assertAttributeEquals(Config.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(Config.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals("Foo", "Bar"); + bundle1.assertAttributeNotExists("Content-Type"); + + final String actual1 = new String(bundle1.toByteArray(), StandardCharsets.UTF_8); + final String expected1 = ""; + Assert.assertEquals(expected1, actual1); + } + + @Test + public void testConnectFailBadPort() throws Exception { + addHandler(new GetOrHeadHandler()); + + // this is the bad urls + String badurlport = "https://localhost:" + 445; + + runner.setProperty(Config.PROP_URL, badurlport + "/doesnotExist"); + createFlowFiles(runner); + + runner.run(); + runner.assertTransferCount(Config.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(Config.REL_SUCCESS_RESP, 0); + runner.assertTransferCount(Config.REL_RETRY, 0); + runner.assertTransferCount(Config.REL_NO_RETRY, 0); + runner.assertTransferCount(Config.REL_FAILURE, 1); + + final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_FAILURE).get(0); + + final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); + final String expected = "Hello"; + Assert.assertEquals(expected, actual); + bundle.assertAttributeEquals("Foo", "Bar"); + } + + // @Test + // public void testGetFlowfileAttributes() throws IOException { + // Map<String, List<String>> input = new HashMap<>(); + // input.put("A", Arrays.asList("1")); + // input.put("B", Arrays.asList("1", "2", "3")); + // input.put("C", new ArrayList<String>()); + // input.put("D", null); + // + // Map<String, String> expected = new HashMap<>(); + // expected.put(Config.STATUS_CODE, "200"); + // expected.put(Config.STATUS_MESSAGE, "OK"); + // expected.put(Config.STATUS_LINE, "HTTP/1.1 200 OK"); + // expected.put("A", "1"); + // expected.put("B", "1, 2, 3"); + // + // URL url = new URL("file:/dev/null"); + // HttpURLConnection conn = new MockHttpURLConnection(url, 200, "OK", input); + // + // Map<String, String> actual = processor.getAttributesFromHeaders(conn); + // + // assertEquals(expected, actual); + // } + // @Test + // public void testCsv() { + // // null input should return an empty string + // assertEquals("", processor.csv(null)); + // + // // empty collection returns empty string + // assertEquals("", processor.csv(new ArrayList<String>())); + // + // // pretty normal checks + // assertEquals("1", processor.csv(Arrays.asList("1"))); + // assertEquals("1, 2", processor.csv(Arrays.asList("1", "2"))); + // assertEquals("1, 2, 3", processor.csv(Arrays.asList("1", "2", "3"))); + // + // // values should be trimmed + // assertEquals("1, 2, 3", processor.csv(Arrays.asList(" 1", " 2 ", "3 "))); + // + // // empty values should be skipped + // assertEquals("1, 3", processor.csv(Arrays.asList("1", "", "3"))); + // + // // whitespace values should be skipped + // assertEquals("1, 3", processor.csv(Arrays.asList("1", " ", "3"))); + // + // // this (mis)behavior is currently expected, embedded comma delimiters are not escaped + // // note the embedded unescaped comma in the "1, " value + // assertEquals("1,, 2, 3", processor.csv(Arrays.asList("1, ", "2", "3"))); + // } + @Test + public void testConnectFailBadHost() throws Exception { + addHandler(new GetOrHeadHandler()); + + String badurlhost = "https://localhOOst:" + 445; + + runner.setProperty(Config.PROP_URL, badurlhost + "/doesnotExist"); + createFlowFiles(runner); + + runner.run(); + runner.assertTransferCount(Config.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(Config.REL_SUCCESS_RESP, 0); + runner.assertTransferCount(Config.REL_RETRY, 0); + runner.assertTransferCount(Config.REL_NO_RETRY, 0); + runner.assertTransferCount(Config.REL_FAILURE, 1); + + final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_FAILURE).get(0); + + final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); + final String expected = "Hello"; + Assert.assertEquals(expected, actual); + bundle.assertAttributeEquals("Foo", "Bar"); + } + + private static Map<String, String> createSslProperties() { + Map<String, String> map = new HashMap<String, String>(); + map.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); + map.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); + map.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); + map.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); + map.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); + map.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); + return map; + } + + private static TestServer createServer() throws IOException { + return new TestServer(sslProperties); + } + + private static void createFlowFiles(final TestRunner testRunner) throws UnsupportedEncodingException { + final Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text"); + attributes.put("Foo", "Bar"); + testRunner.enqueue("Hello".getBytes("UTF-8"), attributes); + + } + + private static class PostHandler extends AbstractHandler { + + @Override + public void handle(String target, Request baseRequest, + HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { + + baseRequest.setHandled(true); + + assertEquals("/post", target); + + String body = request.getReader().readLine(); + assertEquals("Hello", body); + + } + } + + private static class GetOrHeadHandler extends AbstractHandler { + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { + baseRequest.setHandled(true); + + int status = Integer.valueOf(target.substring("/status".length() + 1)); + response.setStatus(status); + + response.setContentType("text/plain"); + response.setContentLength(target.length()); + + if ("GET".equalsIgnoreCase(request.getMethod())) { + try (PrintWriter writer = response.getWriter()) { + writer.print(target); + writer.flush(); + } + } + + } + + } + + private static class DateHandler extends AbstractHandler { + + private String dateString; + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { + baseRequest.setHandled(true); + + dateString = request.getHeader("Date"); + + response.setStatus(200); + response.setContentType("text/plain"); + response.getWriter().println("Way to go!"); + } + } + + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java index 0000000,0f962d0..19d95b3 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java @@@ -1,0 -1,107 +1,107 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.distributed.cache.server; + + import java.io.IOException; + import java.util.ArrayList; + import java.util.List; + ++import org.apache.nifi.annotation.lifecycle.OnShutdown; + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.controller.AbstractControllerService; + import org.apache.nifi.controller.ConfigurationContext; + import org.apache.nifi.controller.annotation.OnConfigured; -import org.apache.nifi.processor.annotation.OnShutdown; + import org.apache.nifi.processor.util.StandardValidators; + import org.apache.nifi.ssl.SSLContextService; + + public abstract class DistributedCacheServer extends AbstractControllerService { + public static final String EVICTION_STRATEGY_LFU = "Least Frequently Used"; + public static final String EVICTION_STRATEGY_LRU = "Least Recently Used"; + public static final String EVICTION_STRATEGY_FIFO = "First In, First Out"; + + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .name("Port") + .description("The port to listen on for incoming connections") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .defaultValue("4557") + .build(); + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description( + "If specified, this service will be used to create an SSL Context that will be used to secure communications; if not specified, communications will not be secure") + .required(false) + .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class)) + .build(); + public static final PropertyDescriptor MAX_CACHE_ENTRIES = new PropertyDescriptor.Builder() + .name("Maximum Cache Entries") + .description("The maximum number of cache entries that the cache can hold") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("10000") + .build(); + public static final PropertyDescriptor EVICTION_POLICY = new PropertyDescriptor.Builder() + .name("Eviction Strategy") + .description("Determines which strategy should be used to evict values from the cache to make room for new entries") + .required(true) + .allowableValues(EVICTION_STRATEGY_LFU, EVICTION_STRATEGY_LRU, EVICTION_STRATEGY_FIFO) + .defaultValue(EVICTION_STRATEGY_LFU) + .build(); + public static final PropertyDescriptor PERSISTENCE_PATH = new PropertyDescriptor.Builder() + .name("Persistence Directory") + .description("If specified, the cache will be persisted in the given directory; if not specified, the cache will be in-memory only") + .required(false) + .addValidator(StandardValidators.createDirectoryExistsValidator(true, true)) + .build(); + + private volatile CacheServer cacheServer; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(PORT); + properties.add(MAX_CACHE_ENTRIES); + properties.add(EVICTION_POLICY); + properties.add(PERSISTENCE_PATH); + properties.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(SSL_CONTEXT_SERVICE).allowableValues( + getControllerServiceLookup().getControllerServiceIdentifiers(SSLContextService.class)).build()); + return properties; + } + + @OnConfigured + public void startServer(final ConfigurationContext context) throws IOException { + if (cacheServer == null) { + cacheServer = createCacheServer(context); + cacheServer.start(); + } + } + + @OnShutdown + public void shutdownServer() throws IOException { + if (cacheServer != null) { + cacheServer.stop(); + } + cacheServer = null; + } + + @Override + protected void finalize() throws Throwable { + shutdownServer(); + } + + protected abstract CacheServer createCacheServer(ConfigurationContext context); + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java index 0000000,d7aae16..68f83d4 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java @@@ -1,0 -1,354 +1,354 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.ssl; + + import java.io.File; + import java.net.MalformedURLException; + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.List; + import java.util.Map; + + import javax.net.ssl.SSLContext; + ++import org.apache.nifi.annotation.lifecycle.OnEnabled; + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.ValidationContext; + import org.apache.nifi.components.ValidationResult; + import org.apache.nifi.components.Validator; + import org.apache.nifi.controller.AbstractControllerService; + import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.controller.annotation.OnConfigured; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.util.StandardValidators; + import org.apache.nifi.reporting.InitializationException; + import org.apache.nifi.security.util.CertificateUtils; + import org.apache.nifi.security.util.KeystoreType; + import org.apache.nifi.security.util.SslContextFactory; + + public class StandardSSLContextService extends AbstractControllerService implements SSLContextService { + + public static final String STORE_TYPE_JKS = "JKS"; + public static final String STORE_TYPE_PKCS12 = "PKCS12"; + + public static final PropertyDescriptor TRUSTSTORE = new PropertyDescriptor.Builder() + .name("Truststore Filename") + .description("The fully-qualified filename of the Truststore") + .defaultValue(null) + .addValidator(createFileExistsAndReadableValidator()) + .sensitive(false) + .build(); + public static final PropertyDescriptor TRUSTSTORE_TYPE = new PropertyDescriptor.Builder() + .name("Truststore Type") + .description("The Type of the Truststore. Either JKS or PKCS12") + .allowableValues(STORE_TYPE_JKS, STORE_TYPE_PKCS12) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue(STORE_TYPE_JKS) + .sensitive(false) + .build(); + public static final PropertyDescriptor TRUSTSTORE_PASSWORD = new PropertyDescriptor.Builder() + .name("Truststore Password") + .description("The password for the Truststore") + .defaultValue(null) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + public static final PropertyDescriptor KEYSTORE = new PropertyDescriptor.Builder() + .name("Keystore Filename") + .description("The fully-qualified filename of the Keystore") + .defaultValue(null) + .addValidator(createFileExistsAndReadableValidator()) + .sensitive(false) + .build(); + public static final PropertyDescriptor KEYSTORE_TYPE = new PropertyDescriptor.Builder() + .name("Keystore Type") + .description("The Type of the Keystore") + .allowableValues(STORE_TYPE_JKS, STORE_TYPE_PKCS12) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue(STORE_TYPE_JKS) + .sensitive(false) + .build(); + public static final PropertyDescriptor KEYSTORE_PASSWORD = new PropertyDescriptor.Builder() + .name("Keystore Password") + .defaultValue(null) + .description("The password for the Keystore") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + + private static final List<PropertyDescriptor> properties; + + static { + List<PropertyDescriptor> props = new ArrayList<>(); + props.add(KEYSTORE); + props.add(KEYSTORE_PASSWORD); + props.add(KEYSTORE_TYPE); + props.add(TRUSTSTORE); + props.add(TRUSTSTORE_PASSWORD); + props.add(TRUSTSTORE_TYPE); + properties = Collections.unmodifiableList(props); + } + private ConfigurationContext configContext; + - @OnConfigured ++ @OnEnabled + public void onConfigured(final ConfigurationContext context) throws InitializationException { + configContext = context; + + final Collection<ValidationResult> results = new ArrayList<>(); + results.addAll(validateStore(context.getProperties(), KeystoreValidationGroup.KEYSTORE)); + results.addAll(validateStore(context.getProperties(), KeystoreValidationGroup.TRUSTSTORE)); + + if (!results.isEmpty()) { + final StringBuilder sb = new StringBuilder(this + " is not valid due to:"); + for (final ValidationResult result : results) { + sb.append("\n").append(result.toString()); + } + throw new InitializationException(sb.toString()); + } + + if (countNulls(context.getProperty(KEYSTORE).getValue(), + context.getProperty(KEYSTORE_PASSWORD).getValue(), + context.getProperty(KEYSTORE_TYPE).getValue(), + context.getProperty(TRUSTSTORE).getValue(), + context.getProperty(TRUSTSTORE_PASSWORD).getValue(), + context.getProperty(TRUSTSTORE_TYPE).getValue()) >= 4) { + throw new InitializationException(this + " does not have the KeyStore or the TrustStore populated"); + } + + // verify that the filename, password, and type match + createSSLContext(ClientAuth.REQUIRED); + } + + private static Validator createFileExistsAndReadableValidator() { + return new Validator() { + // Not using the FILE_EXISTS_VALIDATOR because the default is to + // allow expression language + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + final String substituted; + try { + substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue(); + } catch (final Exception e) { + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(false) + .explanation("Not a valid Expression Language value: " + e.getMessage()) + .build(); + } + + final File file = new File(substituted); + final boolean valid = file.exists() && file.canRead(); + final String explanation = valid ? null : "File " + file + " does not exist or cannot be read"; + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(valid) + .explanation(explanation) + .build(); + } + }; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + final Collection<ValidationResult> results = new ArrayList<>(); + results.addAll(validateStore(validationContext.getProperties(), KeystoreValidationGroup.KEYSTORE)); + results.addAll(validateStore(validationContext.getProperties(), KeystoreValidationGroup.TRUSTSTORE)); + + if (countNulls(validationContext.getProperty(KEYSTORE).getValue(), + validationContext.getProperty(KEYSTORE_PASSWORD).getValue(), + validationContext.getProperty(KEYSTORE_TYPE).getValue(), + validationContext.getProperty(TRUSTSTORE).getValue(), + validationContext.getProperty(TRUSTSTORE_PASSWORD).getValue(), + validationContext.getProperty(TRUSTSTORE_TYPE).getValue()) + >= 4) { + results.add(new ValidationResult.Builder() + .subject(this.getClass().getSimpleName() + " : " + getIdentifier()) + .valid(false) + .explanation("Does not have the KeyStore or the TrustStore populated") + .build()); + } + if (results.isEmpty()) { + // verify that the filename, password, and type match + try { + createSSLContext(ClientAuth.REQUIRED); + } catch (ProcessException e) { + results.add(new ValidationResult.Builder() + .subject(getClass().getSimpleName() + " : " + getIdentifier()) + .valid(false) + .explanation(e.getMessage()) + .build()); + } + } + return results; + } + + @Override + public SSLContext createSSLContext(final ClientAuth clientAuth) throws ProcessException { + try { + final String keystoreFile = configContext.getProperty(KEYSTORE).getValue(); + if (keystoreFile == null) { + return SslContextFactory.createTrustSslContext( + configContext.getProperty(TRUSTSTORE).getValue(), + configContext.getProperty(TRUSTSTORE_PASSWORD).getValue().toCharArray(), + configContext.getProperty(TRUSTSTORE_TYPE).getValue()); + } + final String truststoreFile = configContext.getProperty(TRUSTSTORE).getValue(); + if (truststoreFile == null) { + return SslContextFactory.createSslContext( + configContext.getProperty(KEYSTORE).getValue(), + configContext.getProperty(KEYSTORE_PASSWORD).getValue().toCharArray(), + configContext.getProperty(KEYSTORE_TYPE).getValue()); + } + + return SslContextFactory.createSslContext( + configContext.getProperty(KEYSTORE).getValue(), + configContext.getProperty(KEYSTORE_PASSWORD).getValue().toCharArray(), + configContext.getProperty(KEYSTORE_TYPE).getValue(), + configContext.getProperty(TRUSTSTORE).getValue(), + configContext.getProperty(TRUSTSTORE_PASSWORD).getValue().toCharArray(), + configContext.getProperty(TRUSTSTORE_TYPE).getValue(), + org.apache.nifi.security.util.SslContextFactory.ClientAuth.valueOf(clientAuth.name())); + } catch (final Exception e) { + throw new ProcessException(e); + } + } + + @Override + public String getTrustStoreFile() { + return configContext.getProperty(TRUSTSTORE).getValue(); + } + + @Override + public String getTrustStoreType() { + return configContext.getProperty(TRUSTSTORE_TYPE).getValue(); + } + + @Override + public String getTrustStorePassword() { + return configContext.getProperty(TRUSTSTORE_PASSWORD).getValue(); + } + + @Override + public boolean isTrustStoreConfigured() { + return getTrustStoreFile() != null && getTrustStorePassword() != null && getTrustStoreType() != null; + } + + @Override + public String getKeyStoreFile() { + return configContext.getProperty(KEYSTORE).getValue(); + } + + @Override + public String getKeyStoreType() { + return configContext.getProperty(KEYSTORE_TYPE).getValue(); + } + + @Override + public String getKeyStorePassword() { + return configContext.getProperty(KEYSTORE_PASSWORD).getValue(); + } + + @Override + public boolean isKeyStoreConfigured() { + return getKeyStoreFile() != null && getKeyStorePassword() != null && getKeyStoreType() != null; + } + + private static Collection<ValidationResult> validateStore(final Map<PropertyDescriptor, String> properties, + final KeystoreValidationGroup keyStoreOrTrustStore) { + final Collection<ValidationResult> results = new ArrayList<>(); + + final String filename; + final String password; + final String type; + + if (keyStoreOrTrustStore == KeystoreValidationGroup.KEYSTORE) { + filename = properties.get(KEYSTORE); + password = properties.get(KEYSTORE_PASSWORD); + type = properties.get(KEYSTORE_TYPE); + } else { + filename = properties.get(TRUSTSTORE); + password = properties.get(TRUSTSTORE_PASSWORD); + type = properties.get(TRUSTSTORE_TYPE); + } + + final String keystoreDesc = (keyStoreOrTrustStore == KeystoreValidationGroup.KEYSTORE) ? "Keystore" : "Truststore"; + + final int nulls = countNulls(filename, password, type); + if (nulls != 3 && nulls != 0) { + results.add(new ValidationResult.Builder().valid(false).explanation("Must set either 0 or 3 properties for " + keystoreDesc) + .subject(keystoreDesc + " Properties").build()); + } else if (nulls == 0) { + // all properties were filled in. + final File file = new File(filename); + if (!file.exists() || !file.canRead()) { + results.add(new ValidationResult.Builder() + .valid(false) + .subject(keystoreDesc + " Properties") + .explanation("Cannot access file " + file.getAbsolutePath()) + .build()); + } else { + try { + final boolean storeValid = CertificateUtils + .isStoreValid(file.toURI().toURL(), KeystoreType.valueOf(type), password.toCharArray()); + if (!storeValid) { + results.add(new ValidationResult.Builder() + .subject(keystoreDesc + " Properties") + .valid(false) + .explanation("Invalid KeyStore Password or Type specified for file " + filename) + .build()); + } + } catch (MalformedURLException e) { + results.add(new ValidationResult.Builder() + .subject(keystoreDesc + " Properties") + .valid(false) + .explanation("Malformed URL from file: " + e) + .build()); + } + } + } + + return results; + } + + private static int countNulls(Object... objects) { + int count = 0; + for (final Object x : objects) { + if (x == null) { + count++; + } + } + + return count; + } + + public static enum KeystoreValidationGroup { + + KEYSTORE, TRUSTSTORE + } + + @Override + public String toString() { + return "SSLContextService[id=" + getIdentifier() + "]"; + } + }
