Repository: nifi-minifi Updated Branches: refs/heads/master 3125b00e2 -> 270a3081d
MINIFI-434 - PullHttpChangeIngestor should preserve security properties This closes #114. Signed-off-by: Pierre Villard <pierre.villard...@gmail.com> Signed-off-by: Aldrin Piri <ald...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/270a3081 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/270a3081 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/270a3081 Branch: refs/heads/master Commit: 270a3081d29796ffd3aeb25ac14c18f57725300e Parents: 3125b00 Author: Pierre Villard <pierre.villard...@gmail.com> Authored: Thu Feb 1 19:20:43 2018 +0100 Committer: Aldrin Piri <ald...@apache.org> Committed: Thu Feb 8 10:58:21 2018 -0500 ---------------------------------------------------------------------- .../ingestors/AbstractPullChangeIngestor.java | 7 +-- .../ingestors/PullHttpChangeIngestor.java | 44 +++++++++++++++-- .../PullHttpChangeIngestorSSLTest.java | 1 + .../ingestors/PullHttpChangeIngestorTest.java | 2 - .../PullHttpChangeIngestorCommonTest.java | 52 +++++++++++++++++++- .../minifi/commons/schema/ConfigSchema.java | 4 ++ 6 files changed, 100 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/270a3081/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java index 1678f20..deebe90 100644 --- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java @@ -27,12 +27,10 @@ import java.util.Properties; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; - - +import java.util.concurrent.atomic.AtomicReference; public abstract class AbstractPullChangeIngestor implements Runnable, ChangeIngestor { - // 5 minute default pulling period protected static final String DEFAULT_POLLING_PERIOD = "300000"; protected static Logger logger; @@ -40,10 +38,12 @@ public abstract class AbstractPullChangeIngestor implements Runnable, ChangeInge protected final AtomicInteger pollingPeriodMS = new AtomicInteger(); private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); protected volatile ConfigurationChangeNotifier configurationChangeNotifier; + protected final AtomicReference<Properties> properties = new AtomicReference<>(); @Override public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) { this.configurationChangeNotifier = configurationChangeNotifier; + this.properties.set(properties); } @Override @@ -56,5 +56,6 @@ public abstract class AbstractPullChangeIngestor implements Runnable, ChangeInge scheduledThreadPoolExecutor.shutdownNow(); } + @Override public abstract void run(); } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/270a3081/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java index 6c8adcc..f7add36 100644 --- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java @@ -25,11 +25,18 @@ import okhttp3.Request; import okhttp3.Response; import okhttp3.ResponseBody; import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder; +import org.apache.nifi.minifi.bootstrap.RunMiNiFi; import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier; import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator; import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator; +import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream; +import org.apache.nifi.minifi.commons.schema.ConfigSchema; +import org.apache.nifi.minifi.commons.schema.SecurityPropertiesSchema; +import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema; import org.apache.nifi.minifi.commons.schema.common.StringUtil; +import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader; import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; @@ -37,6 +44,8 @@ import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509TrustManager; + +import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.net.InetSocketAddress; @@ -91,6 +100,7 @@ public class PullHttpChangeIngestor extends AbstractPullChangeIngestor { public static final String READ_TIMEOUT_KEY = PULL_HTTP_BASE_KEY + ".read.timeout.ms"; public static final String DIFFERENTIATOR_KEY = PULL_HTTP_BASE_KEY + ".differentiator"; public static final String USE_ETAG_KEY = PULL_HTTP_BASE_KEY + ".use.etag"; + public static final String OVERRIDE_SECURITY = PULL_HTTP_BASE_KEY + ".override.security"; private final AtomicReference<OkHttpClient> httpClientReference = new AtomicReference<>(); private final AtomicReference<Integer> portReference = new AtomicReference<>(); @@ -101,6 +111,7 @@ public class PullHttpChangeIngestor extends AbstractPullChangeIngestor { private volatile String connectionScheme; private volatile String lastEtag = ""; private volatile boolean useEtag = false; + private volatile boolean overrideSecurity = false; public PullHttpChangeIngestor() { logger = LoggerFactory.getLogger(PullHttpChangeIngestor.class); @@ -144,6 +155,14 @@ public class PullHttpChangeIngestor extends AbstractPullChangeIngestor { "the default value of \"false\". It is set to \"" + useEtagString + "\"."); } + final String overrideSecurityProperties = (String) properties.getOrDefault(OVERRIDE_SECURITY, "false"); + if ("true".equalsIgnoreCase(overrideSecurityProperties) || "false".equalsIgnoreCase(overrideSecurityProperties)){ + overrideSecurity = Boolean.parseBoolean(overrideSecurityProperties); + } else { + throw new IllegalArgumentException("Property, " + OVERRIDE_SECURITY + ", to specify whether to override security properties must either be a value boolean value (\"true\" or \"false\")" + + " or left to the default value of \"false\". It is set to \"" + overrideSecurityProperties + "\"."); + } + httpClientReference.set(null); final OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder(); @@ -252,12 +271,31 @@ public class PullHttpChangeIngestor extends AbstractPullChangeIngestor { } ByteBuffer bodyByteBuffer = ByteBuffer.wrap(body.bytes()); + ByteBuffer readOnlyNewConfig = null; - if (differentiator.isNew(bodyByteBuffer)) { - logger.debug("New change, notifying listener"); + // checking if some parts of the configuration must be preserved + if(overrideSecurity) { + readOnlyNewConfig = bodyByteBuffer.asReadOnlyBuffer(); + } else { + logger.debug("Preserving previous security properties..."); + + // get the current security properties from the current configuration file + final File configFile = new File(properties.get().getProperty(RunMiNiFi.MINIFI_CONFIG_FILE_KEY)); + ConvertableSchema<ConfigSchema> configSchema = SchemaLoader.loadConvertableSchemaFromYaml(new FileInputStream(configFile)); + ConfigSchema currentSchema = configSchema.convert(); + SecurityPropertiesSchema secProps = currentSchema.getSecurityProperties(); - ByteBuffer readOnlyNewConfig = bodyByteBuffer.asReadOnlyBuffer(); + // override the security properties in the pulled configuration with the previous properties + configSchema = SchemaLoader.loadConvertableSchemaFromYaml(new ByteBufferInputStream(bodyByteBuffer.duplicate())); + ConfigSchema newSchema = configSchema.convert(); + newSchema.setSecurityProperties(secProps); + + // return the updated configuration preserving the previous security configuration + readOnlyNewConfig = ByteBuffer.wrap(new Yaml().dump(newSchema.toMap()).getBytes()).asReadOnlyBuffer(); + } + if (differentiator.isNew(readOnlyNewConfig)) { + logger.debug("New change received, notifying listener"); configurationChangeNotifier.notifyListeners(readOnlyNewConfig); logger.debug("Listeners notified"); } else { http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/270a3081/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorSSLTest.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorSSLTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorSSLTest.java index 195cf60..470b380 100644 --- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorSSLTest.java +++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorSSLTest.java @@ -75,6 +75,7 @@ public class PullHttpChangeIngestorSSLTest extends PullHttpChangeIngestorCommonT port = ((ServerConnector) jetty.getConnectors()[0]).getLocalPort(); properties.put(PullHttpChangeIngestor.PORT_KEY, String.valueOf(port)); properties.put(PullHttpChangeIngestor.HOST_KEY, "localhost"); + properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true"); pullHttpChangeIngestor = new PullHttpChangeIngestor(); http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/270a3081/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java index f39fbdf..5bd2187 100644 --- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java +++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java @@ -57,8 +57,6 @@ public class PullHttpChangeIngestorTest extends PullHttpChangeIngestorCommonTest properties.put(PullHttpChangeIngestor.PULL_HTTP_POLLING_PERIOD_KEY, "30000"); pullHttpChangeIngestor = new PullHttpChangeIngestor(); - - pullHttpChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier); pullHttpChangeIngestor.setDifferentiator(mockDifferentiator); } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/270a3081/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/PullHttpChangeIngestorCommonTest.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/PullHttpChangeIngestorCommonTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/PullHttpChangeIngestorCommonTest.java index 229e33d..798e285 100644 --- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/PullHttpChangeIngestorCommonTest.java +++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/PullHttpChangeIngestorCommonTest.java @@ -17,11 +17,18 @@ package org.apache.nifi.minifi.bootstrap.configuration.ingestors.common; +import org.apache.nifi.minifi.bootstrap.RunMiNiFi; import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener; import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier; import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult; import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator; import org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor; +import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream; +import org.apache.nifi.minifi.commons.schema.ConfigSchema; +import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema; +import org.apache.nifi.minifi.commons.schema.exception.SchemaLoaderException; +import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader; +import org.apache.nifi.util.file.FileUtils; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.AbstractHandler; @@ -30,12 +37,14 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import java.io.File; import java.io.IOException; import java.io.PrintWriter; import java.nio.ByteBuffer; @@ -44,6 +53,8 @@ import java.util.Collections; import java.util.Properties; import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PATH_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -89,6 +100,7 @@ public abstract class PullHttpChangeIngestorCommonTest { @Test public void testNewUpdate() throws IOException { Properties properties = new Properties(); + properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true"); pullHttpChangeIngestorInit(properties); pullHttpChangeIngestor.setUseEtag(false); when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true); @@ -98,10 +110,31 @@ public abstract class PullHttpChangeIngestorCommonTest { verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.eq(configBuffer.asReadOnlyBuffer())); } + @Test + public void testSecurityOverride() throws IOException, SchemaLoaderException { + Properties properties = new Properties(); + properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "false"); + properties.put(RunMiNiFi.MINIFI_CONFIG_FILE_KEY, "src/test/resources/config.yml"); + properties.put(PATH_KEY, "/config-minimal.yml"); + pullHttpChangeIngestorInit(properties); + when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true); + + pullHttpChangeIngestor.run(); + + ArgumentCaptor<ByteBuffer> argument = ArgumentCaptor.forClass(ByteBuffer.class); + verify(testNotifier, Mockito.times(1)).notifyListeners(argument.capture()); + + ConvertableSchema<ConfigSchema> configSchema = SchemaLoader.loadConvertableSchemaFromYaml(new ByteBufferInputStream(argument.getValue())); + ConfigSchema newSchema = configSchema.convert(); + + assertNotNull(newSchema.getSecurityProperties().getKeystore()); + assertEquals(newSchema.getProcessGroupSchema().getProcessors().size(), 2); + } @Test public void testNoUpdate() throws IOException { Properties properties = new Properties(); + properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true"); pullHttpChangeIngestorInit(properties); pullHttpChangeIngestor.setUseEtag(false); when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(false); @@ -114,6 +147,7 @@ public abstract class PullHttpChangeIngestorCommonTest { @Test public void testUseEtag() throws IOException { Properties properties = new Properties(); + properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true"); pullHttpChangeIngestorInit(properties); pullHttpChangeIngestor.setLastEtag(""); @@ -135,6 +169,7 @@ public abstract class PullHttpChangeIngestorCommonTest { public void testNewUpdateWithPath() throws IOException { Properties properties = new Properties(); properties.put(PATH_KEY, "/config.yml"); + properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true"); pullHttpChangeIngestorInit(properties); pullHttpChangeIngestor.setUseEtag(false); when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true); @@ -147,6 +182,7 @@ public abstract class PullHttpChangeIngestorCommonTest { @Test public void testNoUpdateWithPath() throws IOException { Properties properties = new Properties(); + properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true"); properties.put(PATH_KEY, "/config.yml"); pullHttpChangeIngestorInit(properties); pullHttpChangeIngestor.setUseEtag(false); @@ -160,6 +196,7 @@ public abstract class PullHttpChangeIngestorCommonTest { @Test public void testUseEtagWithPath() throws IOException { Properties properties = new Properties(); + properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true"); properties.put(PATH_KEY, "/config.yml"); pullHttpChangeIngestorInit(properties); pullHttpChangeIngestor.setLastEtag(""); @@ -187,7 +224,6 @@ public abstract class PullHttpChangeIngestorCommonTest { this.pathResponse = pathResponse; } - @Override public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { @@ -199,9 +235,10 @@ public abstract class PullHttpChangeIngestorCommonTest { if (QUOTED_ETAG.equals(baseRequest.getHeader("If-None-Match"))){ writeOutput(response, null, 304); } else { - if ("/config.yml".equals(baseRequest.getPathInfo())) { writeOutput(response, pathResponse, 200); + } else if ("/config-minimal.yml".equals(baseRequest.getPathInfo())) { + writeFileOutput(response, new File("src/test/resources/config-minimal.yml"), 200); } else { writeOutput(response, configResponse, 200); } @@ -227,5 +264,16 @@ public abstract class PullHttpChangeIngestorCommonTest { } } + private void writeFileOutput(HttpServletResponse response, File file, int responseCode) throws IOException { + response.setStatus(responseCode); + response.setHeader("ETag", ETAG); + if (file != null) { + response.setContentType("text/plain"); + response.setContentLength((int) file.length()); + response.setCharacterEncoding(StandardCharsets.UTF_8.displayName()); + FileUtils.copyFile(file, response.getOutputStream(), true, true); + } + } + } } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/270a3081/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java ---------------------------------------------------------------------- diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java index 0b33825..d871ffd 100644 --- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java +++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java @@ -197,6 +197,10 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta return securityProperties; } + public void setSecurityProperties(SecurityPropertiesSchema securityProperties) { + this.securityProperties = securityProperties; + } + public ProcessGroupSchema getProcessGroupSchema() { return processGroupSchema; }