This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new a3c1cd0 NIFI-8499 - Added encrypted FlowFile repository swap file
implementation
a3c1cd0 is described below
commit a3c1cd074bafe6ed1975cef2a08793b2a47e4dff
Author: Paul Grey <[email protected]>
AuthorDate: Thu Jun 3 15:37:27 2021 -0400
NIFI-8499 - Added encrypted FlowFile repository swap file implementation
This closes #5122
Signed-off-by: David Handermann <[email protected]>
---
.../src/main/asciidoc/administration-guide.adoc | 6 +-
.../controller/EncryptedFileSystemSwapManager.java | 104 +++++++++++++++++++
.../nifi/controller/FileSystemSwapManager.java | 21 ++--
....nifi.controller.repository.FlowFileSwapManager | 3 +-
.../TestEncryptedFileSystemSwapManager.java | 115 +++++++++++++++++++++
5 files changed, 239 insertions(+), 10 deletions(-)
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc
b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 6b68282..52d8008 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -427,7 +427,7 @@ To enable authentication via SAML the following properties
must be configured in
|`nifi.security.user.saml.group.attribute.name`| The name of a SAML assertion
attribute containing group names the user belongs to. This property is
optional, but if populated the groups will be passed along to the authorization
process.
|`nifi.security.user.saml.metadata.signing.enabled`| Enables signing of the
generated service provider metadata.
|`nifi.security.user.saml.request.signing.enabled`| Controls the value of
`AuthnRequestsSigned` in the generated service provider metadata from
`nifi-api/access/saml/metadata`. This indicates that the service provider (i.e.
NiFi) should not sign authentication requests sent to the identity provider,
but the requests may still need to be signed if the identity provider indicates
`WantAuthnRequestSigned=true`.
-|`nifi.security.user.saml.want.assertions.signed`| Controls the value of
`WantAssertionsSigned` in the generated service provider metadata from
`nifi-api/access/saml/metadata`. This indictaes that the identity provider
should sign assertions, but some identity providers may provide their own
configuration for controlling whether assertions are signed.
+|`nifi.security.user.saml.want.assertions.signed`| Controls the value of
`WantAssertionsSigned` in the generated service provider metadata from
`nifi-api/access/saml/metadata`. This indicates that the identity provider
should sign assertions, but some identity providers may provide their own
configuration for controlling whether assertions are signed.
|`nifi.security.user.saml.signature.algorithm`| The algorithm to use when
signing SAML messages. Reference the
link:https://git.shibboleth.net/view/?p=java-xmltooling.git;a=blob;f=src/main/java/org/opensaml/xml/signature/SignatureConstants.java[Open
SAML Signature Constants] for a list of valid values. If not specified, a
default of SHA-256 will be used.
|`nifi.security.user.saml.signature.digest.algorithm`| The digest algorithm to
use when signing SAML messages. Reference the
link:https://git.shibboleth.net/view/?p=java-xmltooling.git;a=blob;f=src/main/java/org/opensaml/xml/signature/SignatureConstants.java[Open
SAML Signature Constants] for a list of valid values. If not specified, a
default of SHA-256 will be used.
|`nifi.security.user.saml.message.logging.enabled`| Enables logging of SAML
messages for debugging purposes.
@@ -2961,7 +2961,9 @@ available again. These properties govern how that process
occurs.
|====
|*Property*|*Description*
-|`nifi.swap.manager.implementation`|The Swap Manager implementation. The
default value is `org.apache.nifi.controller.FileSystemSwapManager` and should
not be changed.
+|`nifi.swap.manager.implementation`| The Swap Manager implementation. The
default value is `org.apache.nifi.controller.FileSystemSwapManager`.
+There is an alternate implementation, `EncryptedFileSystemSwapManager`, that
encrypts the swap file content on
+disk. The encryption key configured for the FlowFile repository is used to
perform the encryption, using the AES-GCM algorithm.
|`nifi.queue.swap.threshold`|The queue threshold at which NiFi starts to swap
FlowFile information to disk. The default value is `20000`.
|====
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EncryptedFileSystemSwapManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EncryptedFileSystemSwapManager.java
new file mode 100644
index 0000000..6e064e6
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EncryptedFileSystemSwapManager.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.security.kms.CryptoUtils;
+import org.apache.nifi.security.kms.EncryptionException;
+import org.apache.nifi.security.kms.KeyProvider;
+import org.apache.nifi.security.repository.RepositoryEncryptorUtils;
+import
org.apache.nifi.security.repository.config.FlowFileRepositoryEncryptionConfiguration;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.crypto.Cipher;
+import javax.crypto.CipherInputStream;
+import javax.crypto.CipherOutputStream;
+import javax.crypto.SecretKey;
+import javax.crypto.spec.GCMParameterSpec;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.GeneralSecurityException;
+import java.security.SecureRandom;
+
+/**
+ * <p>
+ * An implementation of {@link FlowFileSwapManager} that swaps FlowFiles
+ * to/from local disk. The swap file is encrypted using AES/GCM, using the
+ * encryption key defined in nifi.properties for the FlowFile repository.
+ * </p>
+ */
+public class EncryptedFileSystemSwapManager extends FileSystemSwapManager {
+
+ private static final String CIPHER_TRANSFORMATION = "AES/GCM/NoPadding";
+ private static final int SIZE_IV_AES_BYTES = 16;
+ private static final int SIZE_TAG_GCM_BITS = 128;
+
+ private static final Logger logger =
LoggerFactory.getLogger(EncryptedFileSystemSwapManager.class);
+ private static final SecureRandom secureRandom = new SecureRandom();
+
+ private final SecretKey secretKey;
+
+ public EncryptedFileSystemSwapManager(final NiFiProperties nifiProperties)
+ throws IOException, EncryptionException, GeneralSecurityException {
+ super(nifiProperties);
+ // acquire reference to FlowFileRepository key
+ final FlowFileRepositoryEncryptionConfiguration configuration = new
FlowFileRepositoryEncryptionConfiguration(nifiProperties);
+ if
(!CryptoUtils.isValidRepositoryEncryptionConfiguration(configuration)) {
+ logger.error("The flowfile repository encryption configuration is
not valid (see above). Shutting down...");
+ throw new EncryptionException("The flowfile repository encryption
configuration is not valid");
+ }
+ final KeyProvider keyProvider =
RepositoryEncryptorUtils.validateAndBuildRepositoryKeyProvider(configuration);
+ this.secretKey =
keyProvider.getKey(configuration.getEncryptionKeyId());
+ }
+
+ protected InputStream getInputStream(final File file) throws IOException {
+ final FileInputStream fis = new FileInputStream(file);
+ try {
+ final byte[] iv = new byte[SIZE_IV_AES_BYTES];
+ final int ivBytesRead = fis.read(iv);
+ if (ivBytesRead != SIZE_IV_AES_BYTES) {
+ throw new IOException(String.format(
+ "problem reading IV [expected=%d, actual=%d]",
SIZE_IV_AES_BYTES, ivBytesRead));
+ }
+ final Cipher cipher = Cipher.getInstance(CIPHER_TRANSFORMATION);
+ cipher.init(Cipher.DECRYPT_MODE, secretKey, new
GCMParameterSpec(SIZE_TAG_GCM_BITS, iv));
+ return new CipherInputStream(fis, cipher);
+ } catch (GeneralSecurityException e) {
+ throw new IOException(String.format("Preparing Cipher Failed for
File [%s]", file.getAbsolutePath()), e);
+ }
+ }
+
+ protected OutputStream getOutputStream(final File file) throws IOException
{
+ final byte[] iv = new byte[SIZE_IV_AES_BYTES];
+ secureRandom.nextBytes(iv);
+ final FileOutputStream fos = new FileOutputStream(file);
+ fos.write(iv);
+ try {
+ final Cipher cipher = Cipher.getInstance(CIPHER_TRANSFORMATION);
+ cipher.init(Cipher.ENCRYPT_MODE, secretKey, new
GCMParameterSpec(SIZE_TAG_GCM_BITS, iv));
+ return new CipherOutputStream(fos, cipher);
+ } catch (GeneralSecurityException e) {
+ throw new IOException(String.format("Preparing Cipher Failed for
File [%s]", file.getAbsolutePath()), e);
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 8885877..b01bdc9 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -119,6 +119,13 @@ public class FileSystemSwapManager implements
FlowFileSwapManager {
this.flowFileRepository =
initializationContext.getFlowFileRepository();
}
+ protected InputStream getInputStream(final File file) throws IOException {
+ return new FileInputStream(file);
+ }
+
+ protected OutputStream getOutputStream(final File file) throws IOException
{
+ return new FileOutputStream(file);
+ }
@Override
public String swapOut(final List<FlowFileRecord> toSwap, final
FlowFileQueue flowFileQueue, final String partitionName) throws IOException {
@@ -135,14 +142,14 @@ public class FileSystemSwapManager implements
FlowFileSwapManager {
final String swapLocation = swapFile.getAbsolutePath();
final SwapSerializer serializer = new SchemaSwapSerializer();
- try (final FileOutputStream fos = new FileOutputStream(swapTempFile);
- final OutputStream out = new BufferedOutputStream(fos)) {
+ try (final OutputStream os = getOutputStream(swapTempFile);
+ final OutputStream out = new BufferedOutputStream(os)) {
out.write(MAGIC_HEADER);
final DataOutputStream dos = new DataOutputStream(out);
dos.writeUTF(serializer.getSerializationName());
serializer.serializeFlowFiles(toSwap, flowFileQueue, swapLocation,
out);
- fos.getFD().sync();
+ out.flush();
} catch (final IOException ioe) {
// we failed to write out the entire swap file. Delete the
temporary file, if we can.
swapTempFile.delete();
@@ -188,8 +195,8 @@ public class FileSystemSwapManager implements
FlowFileSwapManager {
throw new FileNotFoundException("Failed to swap in FlowFiles from
external storage location " + swapLocation + " into FlowFile Queue because the
file could not be found");
}
- try (final InputStream fis = new FileInputStream(swapFile);
- final InputStream bis = new BufferedInputStream(fis);
+ try (final InputStream is = getInputStream(swapFile);
+ final InputStream bis = new BufferedInputStream(is);
final DataInputStream in = new DataInputStream(bis)) {
final SwapDeserializer deserializer = createSwapDeserializer(in);
@@ -318,7 +325,7 @@ public class FileSystemSwapManager implements
FlowFileSwapManager {
}
// Read the queue identifier from the swap file to check if the
swap file is for this queue
- try (final InputStream fis = new FileInputStream(swapFile);
+ try (final InputStream fis = getInputStream(swapFile);
final InputStream bufferedIn = new
BufferedInputStream(fis);
final DataInputStream in = new
DataInputStream(bufferedIn)) {
@@ -351,7 +358,7 @@ public class FileSystemSwapManager implements
FlowFileSwapManager {
final File swapFile = new File(swapLocation);
// read record from disk via the swap file
- try (final InputStream fis = new FileInputStream(swapFile);
+ try (final InputStream fis = getInputStream(swapFile);
final InputStream bufferedIn = new BufferedInputStream(fis);
final DataInputStream in = new DataInputStream(bufferedIn)) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileSwapManager
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileSwapManager
index e5c63ac..b0ba624 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileSwapManager
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.repository.FlowFileSwapManager
@@ -12,4 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.nifi.controller.FileSystemSwapManager
\ No newline at end of file
+org.apache.nifi.controller.FileSystemSwapManager
+org.apache.nifi.controller.EncryptedFileSystemSwapManager
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestEncryptedFileSystemSwapManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestEncryptedFileSystemSwapManager.java
new file mode 100644
index 0000000..6ef5115
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestEncryptedFileSystemSwapManager.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.StandardFlowFileRecord;
+import org.apache.nifi.controller.repository.SwapContents;
+import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.security.kms.EncryptionException;
+import org.apache.nifi.security.kms.StaticKeyProvider;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.logging.Logger;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for {@link EncryptedFileSystemSwapManager}.
+ */
+public class TestEncryptedFileSystemSwapManager {
+ private static final Logger logger =
Logger.getLogger(TestEncryptedFileSystemSwapManager.class.getName());
+
+ /**
+ * Test a simple swap to disk / swap from disk operation. Configured to
use {@link StaticKeyProvider}.
+ */
+ @Test
+ public void testSwapOutSwapIn() throws GeneralSecurityException,
EncryptionException, IOException {
+ // use temp folder on filesystem to temporarily hold swap content
(clean up after test)
+ final File folderRepository =
Files.createTempDirectory(getClass().getSimpleName()).toFile();
+ logger.info(folderRepository.getPath());
+ folderRepository.deleteOnExit();
+ new File(folderRepository, "swap").deleteOnExit();
+
+ // configure a nifi properties for encrypted swap file
+ final Properties properties = new Properties();
+
properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS,
StaticKeyProvider.class.getName());
+ properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID,
NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY);
+ properties.put(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY,
StringUtils.repeat("00", 32));
+ properties.put(NiFiProperties.FLOWFILE_REPOSITORY_DIRECTORY,
folderRepository.getPath());
+ final NiFiProperties nifiProperties =
NiFiProperties.createBasicNiFiProperties(null, properties);
+
+ // generate some flow file content to swap to disk
+ final List<FlowFileRecord> flowFiles = new ArrayList<>();
+ for (int i = 0; (i < 100); ++i) {
+ flowFiles.add(new StandardFlowFileRecord.Builder().id(i).build());
+ }
+
+ // setup for test case
+ final FlowFileSwapManager swapManager =
createSwapManager(nifiProperties);
+ final String queueIdentifier = UUID.randomUUID().toString();
+ final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+ when(flowFileQueue.getIdentifier()).thenReturn(queueIdentifier);
+
+ // swap out to disk; pull content back from disk
+ final String swapPath = swapManager.swapOut(flowFiles, flowFileQueue,
"partition-1");
+ final SwapContents swapContents = swapManager.swapIn(swapPath,
flowFileQueue);
+
+ // verify recovery of original content
+ final List<FlowFileRecord> flowFilesRecovered =
swapContents.getFlowFiles();
+ Assert.assertEquals(flowFiles.size(), flowFilesRecovered.size());
+ Assert.assertTrue(flowFilesRecovered.containsAll(flowFiles));
+ Assert.assertTrue(flowFiles.containsAll(flowFilesRecovered));
+ }
+
+ /**
+ * Borrowed from
"nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java".
+ */
+ private FlowFileSwapManager createSwapManager(NiFiProperties
nifiProperties)
+ throws IOException, GeneralSecurityException, EncryptionException {
+ final FlowFileRepository flowFileRepo =
Mockito.mock(FlowFileRepository.class);
+ when(flowFileRepo.isValidSwapLocationSuffix(any())).thenReturn(true);
+
+ final FileSystemSwapManager swapManager = new
EncryptedFileSystemSwapManager(nifiProperties);
+ final ResourceClaimManager resourceClaimManager =
Mockito.mock(ResourceClaimManager.class);
+ final SwapManagerInitializationContext context =
Mockito.mock(SwapManagerInitializationContext.class);
+
when(context.getResourceClaimManager()).thenReturn(resourceClaimManager);
+ when(context.getFlowFileRepository()).thenReturn(flowFileRepo);
+ when(context.getEventReporter()).thenReturn(EventReporter.NO_OP);
+ swapManager.initialize(context);
+
+ return swapManager;
+ }
+}