Repository: nifi-minifi
Updated Branches:
  refs/heads/master 3125b00e2 -> 270a3081d


MINIFI-434 - PullHttpChangeIngestor should preserve security properties

This closes #114.

Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>
Signed-off-by: Aldrin Piri <ald...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/270a3081
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/270a3081
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/270a3081

Branch: refs/heads/master
Commit: 270a3081d29796ffd3aeb25ac14c18f57725300e
Parents: 3125b00
Author: Pierre Villard <pierre.villard...@gmail.com>
Authored: Thu Feb 1 19:20:43 2018 +0100
Committer: Aldrin Piri <ald...@apache.org>
Committed: Thu Feb 8 10:58:21 2018 -0500

----------------------------------------------------------------------
 .../ingestors/AbstractPullChangeIngestor.java   |  7 +--
 .../ingestors/PullHttpChangeIngestor.java       | 44 +++++++++++++++--
 .../PullHttpChangeIngestorSSLTest.java          |  1 +
 .../ingestors/PullHttpChangeIngestorTest.java   |  2 -
 .../PullHttpChangeIngestorCommonTest.java       | 52 +++++++++++++++++++-
 .../minifi/commons/schema/ConfigSchema.java     |  4 ++
 6 files changed, 100 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/270a3081/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java
----------------------------------------------------------------------
diff --git 
a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java
 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java
index 1678f20..deebe90 100644
--- 
a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java
+++ 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java
@@ -27,12 +27,10 @@ import java.util.Properties;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
-
+import java.util.concurrent.atomic.AtomicReference;
 
 public abstract class AbstractPullChangeIngestor implements Runnable, 
ChangeIngestor {
 
-
     // 5 minute default pulling period
     protected static final String DEFAULT_POLLING_PERIOD = "300000";
     protected static Logger logger;
@@ -40,10 +38,12 @@ public abstract class AbstractPullChangeIngestor implements 
Runnable, ChangeInge
     protected final AtomicInteger pollingPeriodMS = new AtomicInteger();
     private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = 
new ScheduledThreadPoolExecutor(1);
     protected volatile ConfigurationChangeNotifier configurationChangeNotifier;
+    protected final AtomicReference<Properties> properties = new 
AtomicReference<>();
 
     @Override
     public void initialize(Properties properties, ConfigurationFileHolder 
configurationFileHolder, ConfigurationChangeNotifier 
configurationChangeNotifier) {
         this.configurationChangeNotifier = configurationChangeNotifier;
+        this.properties.set(properties);
     }
 
     @Override
@@ -56,5 +56,6 @@ public abstract class AbstractPullChangeIngestor implements 
Runnable, ChangeInge
         scheduledThreadPoolExecutor.shutdownNow();
     }
 
+    @Override
     public abstract void run();
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/270a3081/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
----------------------------------------------------------------------
diff --git 
a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
index 6c8adcc..f7add36 100644
--- 
a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
+++ 
b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
@@ -25,11 +25,18 @@ import okhttp3.Request;
 import okhttp3.Response;
 import okhttp3.ResponseBody;
 import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
 import 
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
 import 
org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
 import 
org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
+import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
+import org.apache.nifi.minifi.commons.schema.ConfigSchema;
+import org.apache.nifi.minifi.commons.schema.SecurityPropertiesSchema;
+import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema;
 import org.apache.nifi.minifi.commons.schema.common.StringUtil;
+import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
 import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
 
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
@@ -37,6 +44,8 @@ import javax.net.ssl.SSLSocketFactory;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
 import javax.net.ssl.X509TrustManager;
+
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -91,6 +100,7 @@ public class PullHttpChangeIngestor extends 
AbstractPullChangeIngestor {
     public static final String READ_TIMEOUT_KEY = PULL_HTTP_BASE_KEY + 
".read.timeout.ms";
     public static final String DIFFERENTIATOR_KEY = PULL_HTTP_BASE_KEY + 
".differentiator";
     public static final String USE_ETAG_KEY = PULL_HTTP_BASE_KEY + ".use.etag";
+    public static final String OVERRIDE_SECURITY = PULL_HTTP_BASE_KEY + 
".override.security";
 
     private final AtomicReference<OkHttpClient> httpClientReference = new 
AtomicReference<>();
     private final AtomicReference<Integer> portReference = new 
AtomicReference<>();
@@ -101,6 +111,7 @@ public class PullHttpChangeIngestor extends 
AbstractPullChangeIngestor {
     private volatile String connectionScheme;
     private volatile String lastEtag = "";
     private volatile boolean useEtag = false;
+    private volatile boolean overrideSecurity = false;
 
     public PullHttpChangeIngestor() {
         logger = LoggerFactory.getLogger(PullHttpChangeIngestor.class);
@@ -144,6 +155,14 @@ public class PullHttpChangeIngestor extends 
AbstractPullChangeIngestor {
                     "the default value of \"false\". It is set to \"" + 
useEtagString + "\".");
         }
 
+        final String overrideSecurityProperties = (String) 
properties.getOrDefault(OVERRIDE_SECURITY, "false");
+        if ("true".equalsIgnoreCase(overrideSecurityProperties) || 
"false".equalsIgnoreCase(overrideSecurityProperties)){
+            overrideSecurity = 
Boolean.parseBoolean(overrideSecurityProperties);
+        } else {
+            throw new IllegalArgumentException("Property, " + 
OVERRIDE_SECURITY + ", to specify whether to override security properties must 
either be a value boolean value (\"true\" or \"false\")" +
+                    " or left to the default value of \"false\". It is set to 
\"" + overrideSecurityProperties + "\".");
+        }
+
         httpClientReference.set(null);
 
         final OkHttpClient.Builder okHttpClientBuilder = new 
OkHttpClient.Builder();
@@ -252,12 +271,31 @@ public class PullHttpChangeIngestor extends 
AbstractPullChangeIngestor {
             }
 
             ByteBuffer bodyByteBuffer = ByteBuffer.wrap(body.bytes());
+            ByteBuffer readOnlyNewConfig = null;
 
-            if (differentiator.isNew(bodyByteBuffer)) {
-                logger.debug("New change, notifying listener");
+            // checking if some parts of the configuration must be preserved
+            if(overrideSecurity) {
+                readOnlyNewConfig = bodyByteBuffer.asReadOnlyBuffer();
+            } else {
+                logger.debug("Preserving previous security properties...");
+
+                // get the current security properties from the current 
configuration file
+                final File configFile = new 
File(properties.get().getProperty(RunMiNiFi.MINIFI_CONFIG_FILE_KEY));
+                ConvertableSchema<ConfigSchema> configSchema = 
SchemaLoader.loadConvertableSchemaFromYaml(new FileInputStream(configFile));
+                ConfigSchema currentSchema = configSchema.convert();
+                SecurityPropertiesSchema secProps = 
currentSchema.getSecurityProperties();
 
-                ByteBuffer readOnlyNewConfig = 
bodyByteBuffer.asReadOnlyBuffer();
+                // override the security properties in the pulled 
configuration with the previous properties
+                configSchema = SchemaLoader.loadConvertableSchemaFromYaml(new 
ByteBufferInputStream(bodyByteBuffer.duplicate()));
+                ConfigSchema newSchema = configSchema.convert();
+                newSchema.setSecurityProperties(secProps);
+
+                // return the updated configuration preserving the previous 
security configuration
+                readOnlyNewConfig = ByteBuffer.wrap(new 
Yaml().dump(newSchema.toMap()).getBytes()).asReadOnlyBuffer();
+            }
 
+            if (differentiator.isNew(readOnlyNewConfig)) {
+                logger.debug("New change received, notifying listener");
                 configurationChangeNotifier.notifyListeners(readOnlyNewConfig);
                 logger.debug("Listeners notified");
             } else {

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/270a3081/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorSSLTest.java
----------------------------------------------------------------------
diff --git 
a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorSSLTest.java
 
b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorSSLTest.java
index 195cf60..470b380 100644
--- 
a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorSSLTest.java
+++ 
b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorSSLTest.java
@@ -75,6 +75,7 @@ public class PullHttpChangeIngestorSSLTest extends 
PullHttpChangeIngestorCommonT
         port = ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
         properties.put(PullHttpChangeIngestor.PORT_KEY, String.valueOf(port));
         properties.put(PullHttpChangeIngestor.HOST_KEY, "localhost");
+        properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
 
         pullHttpChangeIngestor = new PullHttpChangeIngestor();
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/270a3081/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java
----------------------------------------------------------------------
diff --git 
a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java
 
b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java
index f39fbdf..5bd2187 100644
--- 
a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java
+++ 
b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java
@@ -57,8 +57,6 @@ public class PullHttpChangeIngestorTest extends 
PullHttpChangeIngestorCommonTest
         properties.put(PullHttpChangeIngestor.PULL_HTTP_POLLING_PERIOD_KEY, 
"30000");
 
         pullHttpChangeIngestor = new PullHttpChangeIngestor();
-
-
         pullHttpChangeIngestor.initialize(properties, 
Mockito.mock(ConfigurationFileHolder.class), testNotifier);
         pullHttpChangeIngestor.setDifferentiator(mockDifferentiator);
     }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/270a3081/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/PullHttpChangeIngestorCommonTest.java
----------------------------------------------------------------------
diff --git 
a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/PullHttpChangeIngestorCommonTest.java
 
b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/PullHttpChangeIngestorCommonTest.java
index 229e33d..798e285 100644
--- 
a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/PullHttpChangeIngestorCommonTest.java
+++ 
b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/PullHttpChangeIngestorCommonTest.java
@@ -17,11 +17,18 @@
 
 package org.apache.nifi.minifi.bootstrap.configuration.ingestors.common;
 
+import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
 import 
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
 import 
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
 import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
 import 
org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
 import 
org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor;
+import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
+import org.apache.nifi.minifi.commons.schema.ConfigSchema;
+import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema;
+import org.apache.nifi.minifi.commons.schema.exception.SchemaLoaderException;
+import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
+import org.apache.nifi.util.file.FileUtils;
 import org.eclipse.jetty.server.Request;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.handler.AbstractHandler;
@@ -30,12 +37,14 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.nio.ByteBuffer;
@@ -44,6 +53,8 @@ import java.util.Collections;
 import java.util.Properties;
 
 import static 
org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PATH_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -89,6 +100,7 @@ public abstract class PullHttpChangeIngestorCommonTest {
     @Test
     public void testNewUpdate() throws IOException {
         Properties properties = new Properties();
+        properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
         pullHttpChangeIngestorInit(properties);
         pullHttpChangeIngestor.setUseEtag(false);
         
when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true);
@@ -98,10 +110,31 @@ public abstract class PullHttpChangeIngestorCommonTest {
         verify(testNotifier, 
Mockito.times(1)).notifyListeners(Mockito.eq(configBuffer.asReadOnlyBuffer()));
     }
 
+    @Test
+    public void testSecurityOverride() throws IOException, 
SchemaLoaderException {
+        Properties properties = new Properties();
+        properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "false");
+        properties.put(RunMiNiFi.MINIFI_CONFIG_FILE_KEY, 
"src/test/resources/config.yml");
+        properties.put(PATH_KEY, "/config-minimal.yml");
+        pullHttpChangeIngestorInit(properties);
+        
when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true);
+
+        pullHttpChangeIngestor.run();
+
+        ArgumentCaptor<ByteBuffer> argument = 
ArgumentCaptor.forClass(ByteBuffer.class);
+        verify(testNotifier, 
Mockito.times(1)).notifyListeners(argument.capture());
+
+        ConvertableSchema<ConfigSchema> configSchema = 
SchemaLoader.loadConvertableSchemaFromYaml(new 
ByteBufferInputStream(argument.getValue()));
+        ConfigSchema newSchema = configSchema.convert();
+
+        assertNotNull(newSchema.getSecurityProperties().getKeystore());
+        assertEquals(newSchema.getProcessGroupSchema().getProcessors().size(), 
2);
+    }
 
     @Test
     public void testNoUpdate() throws IOException {
         Properties properties = new Properties();
+        properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
         pullHttpChangeIngestorInit(properties);
         pullHttpChangeIngestor.setUseEtag(false);
         
when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(false);
@@ -114,6 +147,7 @@ public abstract class PullHttpChangeIngestorCommonTest {
     @Test
     public void testUseEtag() throws IOException {
         Properties properties = new Properties();
+        properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
         pullHttpChangeIngestorInit(properties);
         pullHttpChangeIngestor.setLastEtag("");
 
@@ -135,6 +169,7 @@ public abstract class PullHttpChangeIngestorCommonTest {
     public void testNewUpdateWithPath() throws IOException {
         Properties properties = new Properties();
         properties.put(PATH_KEY, "/config.yml");
+        properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
         pullHttpChangeIngestorInit(properties);
         pullHttpChangeIngestor.setUseEtag(false);
         
when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true);
@@ -147,6 +182,7 @@ public abstract class PullHttpChangeIngestorCommonTest {
     @Test
     public void testNoUpdateWithPath() throws IOException {
         Properties properties = new Properties();
+        properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
         properties.put(PATH_KEY, "/config.yml");
         pullHttpChangeIngestorInit(properties);
         pullHttpChangeIngestor.setUseEtag(false);
@@ -160,6 +196,7 @@ public abstract class PullHttpChangeIngestorCommonTest {
     @Test
     public void testUseEtagWithPath() throws IOException {
         Properties properties = new Properties();
+        properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
         properties.put(PATH_KEY, "/config.yml");
         pullHttpChangeIngestorInit(properties);
         pullHttpChangeIngestor.setLastEtag("");
@@ -187,7 +224,6 @@ public abstract class PullHttpChangeIngestorCommonTest {
             this.pathResponse = pathResponse;
         }
 
-
         @Override
         public void handle(String target, Request baseRequest, 
HttpServletRequest request, HttpServletResponse response)
                 throws IOException, ServletException {
@@ -199,9 +235,10 @@ public abstract class PullHttpChangeIngestorCommonTest {
                 if 
(QUOTED_ETAG.equals(baseRequest.getHeader("If-None-Match"))){
                     writeOutput(response, null, 304);
                 } else {
-
                     if ("/config.yml".equals(baseRequest.getPathInfo())) {
                         writeOutput(response, pathResponse, 200);
+                    } else if 
("/config-minimal.yml".equals(baseRequest.getPathInfo())) {
+                        writeFileOutput(response, new 
File("src/test/resources/config-minimal.yml"), 200);
                     } else {
                         writeOutput(response, configResponse, 200);
                     }
@@ -227,5 +264,16 @@ public abstract class PullHttpChangeIngestorCommonTest {
             }
         }
 
+        private void writeFileOutput(HttpServletResponse response, File file, 
int responseCode) throws IOException {
+            response.setStatus(responseCode);
+            response.setHeader("ETag", ETAG);
+            if (file != null) {
+                response.setContentType("text/plain");
+                response.setContentLength((int) file.length());
+                
response.setCharacterEncoding(StandardCharsets.UTF_8.displayName());
+                FileUtils.copyFile(file, response.getOutputStream(), true, 
true);
+            }
+        }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/270a3081/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
----------------------------------------------------------------------
diff --git 
a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
 
b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
index 0b33825..d871ffd 100644
--- 
a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
+++ 
b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
@@ -197,6 +197,10 @@ public class ConfigSchema extends BaseSchema implements 
WritableSchema, Converta
         return securityProperties;
     }
 
+    public void setSecurityProperties(SecurityPropertiesSchema 
securityProperties) {
+        this.securityProperties = securityProperties;
+    }
+
     public ProcessGroupSchema getProcessGroupSchema() {
         return processGroupSchema;
     }

Reply via email to