This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new a3c1cd0  NIFI-8499 - Added encrypted FlowFile repository swap file 
implementation
a3c1cd0 is described below

commit a3c1cd074bafe6ed1975cef2a08793b2a47e4dff
Author: Paul Grey <[email protected]>
AuthorDate: Thu Jun 3 15:37:27 2021 -0400

    NIFI-8499 - Added encrypted FlowFile repository swap file implementation
    
    This closes #5122
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../src/main/asciidoc/administration-guide.adoc    |   6 +-
 .../controller/EncryptedFileSystemSwapManager.java | 104 +++++++++++++++++++
 .../nifi/controller/FileSystemSwapManager.java     |  21 ++--
 ....nifi.controller.repository.FlowFileSwapManager |   3 +-
 .../TestEncryptedFileSystemSwapManager.java        | 115 +++++++++++++++++++++
 5 files changed, 239 insertions(+), 10 deletions(-)

diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc 
b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 6b68282..52d8008 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -427,7 +427,7 @@ To enable authentication via SAML the following properties 
must be configured in
 |`nifi.security.user.saml.group.attribute.name`| The name of a SAML assertion 
attribute containing group names the user belongs to. This property is 
optional, but if populated the groups will be passed along to the authorization 
process.
 |`nifi.security.user.saml.metadata.signing.enabled`| Enables signing of the 
generated service provider metadata.
 |`nifi.security.user.saml.request.signing.enabled`| Controls the value of 
`AuthnRequestsSigned` in the generated service provider metadata from 
`nifi-api/access/saml/metadata`. This indicates that the service provider (i.e. 
NiFi) should not sign authentication requests sent to the identity provider, 
but the requests may still need to be signed if the identity provider indicates 
`WantAuthnRequestSigned=true`.
-|`nifi.security.user.saml.want.assertions.signed`| Controls the value of 
`WantAssertionsSigned` in the generated service provider metadata from 
`nifi-api/access/saml/metadata`. This indictaes that the identity provider 
should sign assertions, but some identity providers may provide their own 
configuration for controlling whether assertions are signed.
+|`nifi.security.user.saml.want.assertions.signed`| Controls the value of 
`WantAssertionsSigned` in the generated service provider metadata from 
`nifi-api/access/saml/metadata`. This indicates that the identity provider 
should sign assertions, but some identity providers may provide their own 
configuration for controlling whether assertions are signed.
 |`nifi.security.user.saml.signature.algorithm`| The algorithm to use when 
signing SAML messages. Reference the 
link:https://git.shibboleth.net/view/?p=java-xmltooling.git;a=blob;f=src/main/java/org/opensaml/xml/signature/SignatureConstants.java[Open
 SAML Signature Constants] for a list of valid values. If not specified, a 
default of SHA-256 will be used.
 |`nifi.security.user.saml.signature.digest.algorithm`| The digest algorithm to 
use when signing SAML messages. Reference the 
link:https://git.shibboleth.net/view/?p=java-xmltooling.git;a=blob;f=src/main/java/org/opensaml/xml/signature/SignatureConstants.java[Open
 SAML Signature Constants] for a list of valid values. If not specified, a 
default of SHA-256 will be used.
 |`nifi.security.user.saml.message.logging.enabled`| Enables logging of SAML 
messages for debugging purposes.
@@ -2961,7 +2961,9 @@ available again. These properties govern how that process 
occurs.
 
 |====
 |*Property*|*Description*
-|`nifi.swap.manager.implementation`|The Swap Manager implementation. The 
default value is `org.apache.nifi.controller.FileSystemSwapManager` and should 
not be changed.
+|`nifi.swap.manager.implementation`| The Swap Manager implementation. The 
default value is `org.apache.nifi.controller.FileSystemSwapManager`.
+There is an alternate implementation, `EncryptedFileSystemSwapManager`, that 
encrypts the swap file content on
+disk.  The encryption key configured for the FlowFile repository is used to 
perform the encryption, using the AES-GCM algorithm.
 |`nifi.queue.swap.threshold`|The queue threshold at which NiFi starts to swap 
FlowFile information to disk. The default value is `20000`.
 |====
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EncryptedFileSystemSwapManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EncryptedFileSystemSwapManager.java
new file mode 100644
index 0000000..6e064e6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EncryptedFileSystemSwapManager.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.security.kms.CryptoUtils;
+import org.apache.nifi.security.kms.EncryptionException;
+import org.apache.nifi.security.kms.KeyProvider;
+import org.apache.nifi.security.repository.RepositoryEncryptorUtils;
+import 
org.apache.nifi.security.repository.config.FlowFileRepositoryEncryptionConfiguration;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.crypto.Cipher;
+import javax.crypto.CipherInputStream;
+import javax.crypto.CipherOutputStream;
+import javax.crypto.SecretKey;
+import javax.crypto.spec.GCMParameterSpec;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.GeneralSecurityException;
+import java.security.SecureRandom;
+
+/**
+ * <p>
+ * An implementation of {@link FlowFileSwapManager} that swaps FlowFiles
+ * to/from local disk.  The swap file is encrypted using AES/GCM, using the
+ * encryption key defined in nifi.properties for the FlowFile repository.
+ * </p>
+ */
+public class EncryptedFileSystemSwapManager extends FileSystemSwapManager {
+
+    private static final String CIPHER_TRANSFORMATION = "AES/GCM/NoPadding";
+    private static final int SIZE_IV_AES_BYTES = 16;
+    private static final int SIZE_TAG_GCM_BITS = 128;
+
+    private static final Logger logger = 
LoggerFactory.getLogger(EncryptedFileSystemSwapManager.class);
+    private static final SecureRandom secureRandom = new SecureRandom();
+
+    private final SecretKey secretKey;
+
+    public EncryptedFileSystemSwapManager(final NiFiProperties nifiProperties)
+            throws IOException, EncryptionException, GeneralSecurityException {
+        super(nifiProperties);
+        // acquire reference to FlowFileRepository key
+        final FlowFileRepositoryEncryptionConfiguration configuration = new 
FlowFileRepositoryEncryptionConfiguration(nifiProperties);
+        if 
(!CryptoUtils.isValidRepositoryEncryptionConfiguration(configuration)) {
+            logger.error("The flowfile repository encryption configuration is 
not valid (see above). Shutting down...");
+            throw new EncryptionException("The flowfile repository encryption 
configuration is not valid");
+        }
+        final KeyProvider keyProvider = 
RepositoryEncryptorUtils.validateAndBuildRepositoryKeyProvider(configuration);
+        this.secretKey = 
keyProvider.getKey(configuration.getEncryptionKeyId());
+    }
+
+    protected InputStream getInputStream(final File file) throws IOException {
+        final FileInputStream fis = new FileInputStream(file);
+        try {
+            final byte[] iv = new byte[SIZE_IV_AES_BYTES];
+            final int ivBytesRead = fis.read(iv);
+            if (ivBytesRead != SIZE_IV_AES_BYTES) {
+                throw new IOException(String.format(
+                        "problem reading IV [expected=%d, actual=%d]", 
SIZE_IV_AES_BYTES, ivBytesRead));
+            }
+            final Cipher cipher = Cipher.getInstance(CIPHER_TRANSFORMATION);
+            cipher.init(Cipher.DECRYPT_MODE, secretKey, new 
GCMParameterSpec(SIZE_TAG_GCM_BITS, iv));
+            return new CipherInputStream(fis, cipher);
+        } catch (GeneralSecurityException e) {
+            throw new IOException(String.format("Preparing Cipher Failed for 
File [%s]", file.getAbsolutePath()), e);
+        }
+    }
+
+    protected OutputStream getOutputStream(final File file) throws IOException 
{
+        final byte[] iv = new byte[SIZE_IV_AES_BYTES];
+        secureRandom.nextBytes(iv);
+        final FileOutputStream fos = new FileOutputStream(file);
+        fos.write(iv);
+        try {
+            final Cipher cipher = Cipher.getInstance(CIPHER_TRANSFORMATION);
+            cipher.init(Cipher.ENCRYPT_MODE, secretKey, new 
GCMParameterSpec(SIZE_TAG_GCM_BITS, iv));
+            return new CipherOutputStream(fos, cipher);
+        } catch (GeneralSecurityException e) {
+            throw new IOException(String.format("Preparing Cipher Failed for 
File [%s]", file.getAbsolutePath()), e);
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 8885877..b01bdc9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -119,6 +119,13 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
         this.flowFileRepository = 
initializationContext.getFlowFileRepository();
     }
 
+    protected InputStream getInputStream(final File file) throws IOException {
+        return new FileInputStream(file);
+    }
+
+    protected OutputStream getOutputStream(final File file) throws IOException 
{
+        return new FileOutputStream(file);
+    }
 
     @Override
     public String swapOut(final List<FlowFileRecord> toSwap, final 
FlowFileQueue flowFileQueue, final String partitionName) throws IOException {
@@ -135,14 +142,14 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
         final String swapLocation = swapFile.getAbsolutePath();
 
         final SwapSerializer serializer = new SchemaSwapSerializer();
-        try (final FileOutputStream fos = new FileOutputStream(swapTempFile);
-            final OutputStream out = new BufferedOutputStream(fos)) {
+        try (final OutputStream os = getOutputStream(swapTempFile);
+            final OutputStream out = new BufferedOutputStream(os)) {
             out.write(MAGIC_HEADER);
             final DataOutputStream dos = new DataOutputStream(out);
             dos.writeUTF(serializer.getSerializationName());
 
             serializer.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, 
out);
-            fos.getFD().sync();
+            out.flush();
         } catch (final IOException ioe) {
             // we failed to write out the entire swap file. Delete the 
temporary file, if we can.
             swapTempFile.delete();
@@ -188,8 +195,8 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
             throw new FileNotFoundException("Failed to swap in FlowFiles from 
external storage location " + swapLocation + " into FlowFile Queue because the 
file could not be found");
         }
 
-        try (final InputStream fis = new FileInputStream(swapFile);
-                final InputStream bis = new BufferedInputStream(fis);
+        try (final InputStream is = getInputStream(swapFile);
+                final InputStream bis = new BufferedInputStream(is);
                 final DataInputStream in = new DataInputStream(bis)) {
 
             final SwapDeserializer deserializer = createSwapDeserializer(in);
@@ -318,7 +325,7 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
             }
 
             // Read the queue identifier from the swap file to check if the 
swap file is for this queue
-            try (final InputStream fis = new FileInputStream(swapFile);
+            try (final InputStream fis = getInputStream(swapFile);
                     final InputStream bufferedIn = new 
BufferedInputStream(fis);
                     final DataInputStream in = new 
DataInputStream(bufferedIn)) {
 
@@ -351,7 +358,7 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
         final File swapFile = new File(swapLocation);
 
         // read record from disk via the swap file
-        try (final InputStream fis = new FileInputStream(swapFile);
+        try (final InputStream fis = getInputStream(swapFile);
                 final InputStream bufferedIn = new BufferedInputStream(fis);
                 final DataInputStream in = new DataInputStream(bufferedIn)) {
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileSwapManager
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileSwapManager
index e5c63ac..b0ba624 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileSwapManager
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileSwapManager
@@ -12,4 +12,5 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-org.apache.nifi.controller.FileSystemSwapManager
\ No newline at end of file
+org.apache.nifi.controller.FileSystemSwapManager
+org.apache.nifi.controller.EncryptedFileSystemSwapManager
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestEncryptedFileSystemSwapManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestEncryptedFileSystemSwapManager.java
new file mode 100644
index 0000000..6ef5115
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestEncryptedFileSystemSwapManager.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.StandardFlowFileRecord;
+import org.apache.nifi.controller.repository.SwapContents;
+import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.security.kms.EncryptionException;
+import org.apache.nifi.security.kms.StaticKeyProvider;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.logging.Logger;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for {@link EncryptedFileSystemSwapManager}.
+ */
+public class TestEncryptedFileSystemSwapManager {
+    private static final Logger logger = 
Logger.getLogger(TestEncryptedFileSystemSwapManager.class.getName());
+
+    /**
+     * Test a simple swap to disk / swap from disk operation.  Configured to 
use {@link StaticKeyProvider}.
+     */
+    @Test
+    public void testSwapOutSwapIn() throws GeneralSecurityException, 
EncryptionException, IOException {
+        // use temp folder on filesystem to temporarily hold swap content 
(clean up after test)
+        final File folderRepository = 
Files.createTempDirectory(getClass().getSimpleName()).toFile();
+        logger.info(folderRepository.getPath());
+        folderRepository.deleteOnExit();
+        new File(folderRepository, "swap").deleteOnExit();
+
+        // configure a nifi properties for encrypted swap file
+        final Properties properties = new Properties();
+        
properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS,
 StaticKeyProvider.class.getName());
+        properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID, 
NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY);
+        properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY, 
StringUtils.repeat("00", 32));
+        properties.put(NiFiProperties.FLOWFILE_REPOSITORY_DIRECTORY, 
folderRepository.getPath());
+        final NiFiProperties nifiProperties = 
NiFiProperties.createBasicNiFiProperties(null, properties);
+
+        // generate some flow file content to swap to disk
+        final List<FlowFileRecord> flowFiles = new ArrayList<>();
+        for (int i = 0; (i < 100); ++i) {
+            flowFiles.add(new StandardFlowFileRecord.Builder().id(i).build());
+        }
+
+        // setup for test case
+        final FlowFileSwapManager swapManager = 
createSwapManager(nifiProperties);
+        final String queueIdentifier = UUID.randomUUID().toString();
+        final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+        when(flowFileQueue.getIdentifier()).thenReturn(queueIdentifier);
+
+        // swap out to disk; pull content back from disk
+        final String swapPath = swapManager.swapOut(flowFiles, flowFileQueue, 
"partition-1");
+        final SwapContents swapContents = swapManager.swapIn(swapPath, 
flowFileQueue);
+
+        // verify recovery of original content
+        final List<FlowFileRecord> flowFilesRecovered = 
swapContents.getFlowFiles();
+        Assert.assertEquals(flowFiles.size(), flowFilesRecovered.size());
+        Assert.assertTrue(flowFilesRecovered.containsAll(flowFiles));
+        Assert.assertTrue(flowFiles.containsAll(flowFilesRecovered));
+    }
+
+    /**
+     * Borrowed from 
"nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java".
+     */
+    private FlowFileSwapManager createSwapManager(NiFiProperties 
nifiProperties)
+            throws IOException, GeneralSecurityException, EncryptionException {
+        final FlowFileRepository flowFileRepo = 
Mockito.mock(FlowFileRepository.class);
+        when(flowFileRepo.isValidSwapLocationSuffix(any())).thenReturn(true);
+
+        final FileSystemSwapManager swapManager = new 
EncryptedFileSystemSwapManager(nifiProperties);
+        final ResourceClaimManager resourceClaimManager = 
Mockito.mock(ResourceClaimManager.class);
+        final SwapManagerInitializationContext context = 
Mockito.mock(SwapManagerInitializationContext.class);
+        
when(context.getResourceClaimManager()).thenReturn(resourceClaimManager);
+        when(context.getFlowFileRepository()).thenReturn(flowFileRepo);
+        when(context.getEventReporter()).thenReturn(EventReporter.NO_OP);
+        swapManager.initialize(context);
+
+        return swapManager;
+    }
+}

Reply via email to