http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/main/java/org/apache/nifi/properties/ProtectedNiFiProperties.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/main/java/org/apache/nifi/properties/ProtectedNiFiProperties.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/main/java/org/apache/nifi/properties/ProtectedNiFiProperties.java index 4774dc7..fc1d722 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/main/java/org/apache/nifi/properties/ProtectedNiFiProperties.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/main/java/org/apache/nifi/properties/ProtectedNiFiProperties.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; @@ -50,7 +51,7 @@ class ProtectedNiFiProperties extends StandardNiFiProperties { // Default list of "sensitive" property keys public static final List<String> DEFAULT_SENSITIVE_PROPERTIES = new ArrayList<>(asList(SECURITY_KEY_PASSWD, - SECURITY_KEYSTORE_PASSWD, SECURITY_TRUSTSTORE_PASSWD, SENSITIVE_PROPS_KEY)); + SECURITY_KEYSTORE_PASSWD, SECURITY_TRUSTSTORE_PASSWD, SENSITIVE_PROPS_KEY, PROVENANCE_REPO_ENCRYPTION_KEY)); public ProtectedNiFiProperties() { this(new StandardNiFiProperties()); @@ -184,6 +185,17 @@ class ProtectedNiFiProperties extends StandardNiFiProperties { } /** + * Returns a list of the keys identifying "sensitive" properties. There is a default list, + * and additional keys can be provided in the {@code nifi.sensitive.props.additional.keys} property in {@code nifi.properties}. + * + * @return the list of sensitive property keys + */ + public List<String> getPopulatedSensitivePropertyKeys() { + List<String> allSensitiveKeys = getSensitivePropertyKeys(); + return allSensitiveKeys.stream().filter(k -> StringUtils.isNotBlank(getProperty(k))).collect(Collectors.toList()); + } + + /** * Returns true if any sensitive keys are protected. * * @return true if any key is protected; false otherwise @@ -219,7 +231,7 @@ class ProtectedNiFiProperties extends StandardNiFiProperties { Map<String, String> traditionalProtectedProperties = new HashMap<>(); for (String key : sensitiveKeys) { String protection = getProperty(getProtectionKey(key)); - if (!StringUtils.isBlank(protection)) { + if (StringUtils.isNotBlank(protection) && StringUtils.isNotBlank(getProperty(key))) { traditionalProtectedProperties.put(key, protection); } } @@ -237,12 +249,12 @@ class ProtectedNiFiProperties extends StandardNiFiProperties { } /** - * Returns a percentage of the total number of properties marked as sensitive that are currently protected. + * Returns a percentage of the total number of populated properties marked as sensitive that are currently protected. * * @return the percent of sensitive properties marked as protected */ public int getPercentOfSensitivePropertiesProtected() { - return (int) Math.round(getProtectedPropertyKeys().size() / ((double) getSensitivePropertyKeys().size()) * 100); + return (int) Math.round(getProtectedPropertyKeys().size() / ((double) getPopulatedSensitivePropertyKeys().size()) * 100); } /** @@ -421,9 +433,7 @@ class ProtectedNiFiProperties extends StandardNiFiProperties { // Add the protected keys and the protection schemes for (String key : getSensitivePropertyKeys()) { final String plainValue = getInternalNiFiProperties().getProperty(key); - if (plainValue == null || plainValue.trim().isEmpty()) { - protectedProperties.setProperty(key, plainValue); - } else { + if (plainValue != null && !plainValue.trim().isEmpty()) { final String protectedValue = spp.protect(plainValue); protectedProperties.setProperty(key, protectedValue); protectedProperties.setProperty(getProtectionKey(key), protectionScheme);
http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/groovy/org/apache/nifi/properties/AESSensitivePropertyProviderTest.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/groovy/org/apache/nifi/properties/AESSensitivePropertyProviderTest.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/groovy/org/apache/nifi/properties/AESSensitivePropertyProviderTest.groovy index 7896afe..73ae55a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/groovy/org/apache/nifi/properties/AESSensitivePropertyProviderTest.groovy +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/groovy/org/apache/nifi/properties/AESSensitivePropertyProviderTest.groovy @@ -52,7 +52,7 @@ class AESSensitivePropertyProviderTest extends GroovyTestCase { private static final Base64.Decoder decoder = Base64.decoder @BeforeClass - public static void setUpOnce() throws Exception { + static void setUpOnce() throws Exception { Security.addProvider(new BouncyCastleProvider()) logger.metaClass.methodMissing = { String name, args -> @@ -61,12 +61,12 @@ class AESSensitivePropertyProviderTest extends GroovyTestCase { } @Before - public void setUp() throws Exception { + void setUp() throws Exception { } @After - public void tearDown() throws Exception { + void tearDown() throws Exception { } @@ -112,7 +112,7 @@ class AESSensitivePropertyProviderTest extends GroovyTestCase { } @Test - public void testShouldThrowExceptionOnInitializationWithoutBouncyCastle() throws Exception { + void testShouldThrowExceptionOnInitializationWithoutBouncyCastle() throws Exception { // Arrange try { Security.removeProvider(new BouncyCastleProvider().getName()) @@ -133,7 +133,7 @@ class AESSensitivePropertyProviderTest extends GroovyTestCase { // TODO: testShouldGetName() @Test - public void testShouldProtectValue() throws Exception { + void testShouldProtectValue() throws Exception { final String PLAINTEXT = "This is a plaintext value" // Act @@ -163,7 +163,7 @@ class AESSensitivePropertyProviderTest extends GroovyTestCase { } @Test - public void testShouldHandleProtectEmptyValue() throws Exception { + void testShouldHandleProtectEmptyValue() throws Exception { final List<String> EMPTY_PLAINTEXTS = ["", " ", null] // Act @@ -183,7 +183,7 @@ class AESSensitivePropertyProviderTest extends GroovyTestCase { } @Test - public void testShouldUnprotectValue() throws Exception { + void testShouldUnprotectValue() throws Exception { // Arrange final String PLAINTEXT = "This is a plaintext value" @@ -218,7 +218,7 @@ class AESSensitivePropertyProviderTest extends GroovyTestCase { * @throws Exception */ @Test - public void testShouldHandleUnprotectEmptyValue() throws Exception { + void testShouldHandleUnprotectEmptyValue() throws Exception { // Arrange final List<String> EMPTY_CIPHER_TEXTS = ["", " ", null] @@ -239,7 +239,7 @@ class AESSensitivePropertyProviderTest extends GroovyTestCase { } @Test - public void testShouldUnprotectValueWithWhitespace() throws Exception { + void testShouldUnprotectValueWithWhitespace() throws Exception { // Arrange final String PLAINTEXT = "This is a plaintext value" @@ -269,7 +269,7 @@ class AESSensitivePropertyProviderTest extends GroovyTestCase { } @Test - public void testShouldHandleUnprotectMalformedValue() throws Exception { + void testShouldHandleUnprotectMalformedValue() throws Exception { // Arrange final String PLAINTEXT = "This is a plaintext value" @@ -293,7 +293,7 @@ class AESSensitivePropertyProviderTest extends GroovyTestCase { } @Test - public void testShouldHandleUnprotectMissingIV() throws Exception { + void testShouldHandleUnprotectMissingIV() throws Exception { // Arrange final String PLAINTEXT = "This is a plaintext value" @@ -334,7 +334,7 @@ class AESSensitivePropertyProviderTest extends GroovyTestCase { * @throws Exception */ @Test - public void testShouldHandleUnprotectEmptyCipherText() throws Exception { + void testShouldHandleUnprotectEmptyCipherText() throws Exception { // Arrange final String IV_AND_DELIMITER = "${encoder.encodeToString("Bad IV value".getBytes(StandardCharsets.UTF_8))}||" logger.info("IV and delimiter: ${IV_AND_DELIMITER}") @@ -358,7 +358,7 @@ class AESSensitivePropertyProviderTest extends GroovyTestCase { } @Test - public void testShouldHandleUnprotectMalformedIV() throws Exception { + void testShouldHandleUnprotectMalformedIV() throws Exception { // Arrange final String PLAINTEXT = "This is a plaintext value" @@ -382,7 +382,7 @@ class AESSensitivePropertyProviderTest extends GroovyTestCase { } @Test - public void testShouldGetIdentifierKeyWithDifferentMaxKeyLengths() throws Exception { + void testShouldGetIdentifierKeyWithDifferentMaxKeyLengths() throws Exception { // Arrange def keys = getAvailableKeySizes().collectEntries { int keySize -> [(keySize): getKeyOfSize(keySize)] @@ -400,7 +400,7 @@ class AESSensitivePropertyProviderTest extends GroovyTestCase { } @Test - public void testShouldNotAllowEmptyKey() throws Exception { + void testShouldNotAllowEmptyKey() throws Exception { // Arrange final String INVALID_KEY = "" @@ -414,7 +414,7 @@ class AESSensitivePropertyProviderTest extends GroovyTestCase { } @Test - public void testShouldNotAllowIncorrectlySizedKey() throws Exception { + void testShouldNotAllowIncorrectlySizedKey() throws Exception { // Arrange final String INVALID_KEY = "Z" * 31 @@ -428,7 +428,7 @@ class AESSensitivePropertyProviderTest extends GroovyTestCase { } @Test - public void testShouldNotAllowInvalidKey() throws Exception { + void testShouldNotAllowInvalidKey() throws Exception { // Arrange final String INVALID_KEY = "Z" * 32 @@ -445,7 +445,7 @@ class AESSensitivePropertyProviderTest extends GroovyTestCase { * This test is to ensure internal consistency and allow for encrypting value for various property files */ @Test - public void testShouldEncryptArbitraryValues() { + void testShouldEncryptArbitraryValues() { // Arrange def values = ["thisIsABadPassword", "thisIsABadSensitiveKeyPassword", "thisIsABadKeystorePassword", "thisIsABadKeyPassword", "thisIsABadTruststorePassword", "This is an encrypted banner message", "nififtw!"] @@ -471,15 +471,15 @@ class AESSensitivePropertyProviderTest extends GroovyTestCase { * This test is to ensure external compatibility in case someone encodes the encrypted value with Base64 and does not remove the padding */ @Test - public void testShouldDecryptPaddedValue() { + void testShouldDecryptPaddedValue() { // Arrange Assume.assumeTrue("JCE unlimited strength crypto policy must be installed for this test", Cipher.getMaxAllowedKeyLength("AES") > 128) - final String EXPECTED_VALUE = "thisIsABadKeyPassword" - String cipherText = "ac/BaE35SL/esLiJ||+ULRvRLYdIDA2VqpE0eQXDEMjaLBMG2kbKOdOwBk/hGebDKlVg==" + final String EXPECTED_VALUE = getKeyOfSize(256) // "thisIsABadKeyPassword" + String cipherText = "aYDkDKys1ENr3gp+||sTBPpMlIvHcOLTGZlfWct8r9RY8BuDlDkoaYmGJ/9m9af9tZIVzcnDwvYQAaIKxRGF7vI2yrY7Xd6x9GTDnWGiGiRXlaP458BBMMgfzH2O8" String unpaddedCipherText = cipherText.replaceAll("=", "") - String key = getKeyOfSize(256) + String key = "AAAABBBBCCCCDDDDEEEEFFFF00001111" * 2 // getKeyOfSize(256) SensitivePropertyProvider spp = new AESSensitivePropertyProvider(key) http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/groovy/org/apache/nifi/properties/ProtectedNiFiPropertiesGroovyTest.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/groovy/org/apache/nifi/properties/ProtectedNiFiPropertiesGroovyTest.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/groovy/org/apache/nifi/properties/ProtectedNiFiPropertiesGroovyTest.groovy index 0d5c976..6656867 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/groovy/org/apache/nifi/properties/ProtectedNiFiPropertiesGroovyTest.groovy +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/groovy/org/apache/nifi/properties/ProtectedNiFiPropertiesGroovyTest.groovy @@ -38,7 +38,8 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { "nifi.sensitive.props.key", "nifi.security.keystorePasswd", "nifi.security.keyPasswd", - "nifi.security.truststorePasswd" + "nifi.security.truststorePasswd", + "nifi.provenance.repository.encryption.key" ] final def COMMON_ADDITIONAL_SENSITIVE_PROPERTIES = [ @@ -53,7 +54,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { private static String originalPropertiesPath = System.getProperty(NiFiProperties.PROPERTIES_FILE_PATH) @BeforeClass - public static void setUpOnce() throws Exception { + static void setUpOnce() throws Exception { Security.addProvider(new BouncyCastleProvider()) logger.metaClass.methodMissing = { String name, args -> @@ -62,15 +63,15 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { } @Before - public void setUp() throws Exception { + void setUp() throws Exception { } @After - public void tearDown() throws Exception { + void tearDown() throws Exception { } @AfterClass - public static void tearDownOnce() { + static void tearDownOnce() { if (originalPropertiesPath) { System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, originalPropertiesPath) } @@ -127,7 +128,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { } @Test - public void testConstructorShouldCreateNewInstance() throws Exception { + void testConstructorShouldCreateNewInstance() throws Exception { // Arrange // Act @@ -140,7 +141,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { } @Test - public void testConstructorShouldAcceptRawProperties() throws Exception { + void testConstructorShouldAcceptRawProperties() throws Exception { // Arrange Properties rawProperties = new Properties() rawProperties.setProperty("key", "value") @@ -157,7 +158,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { } @Test - public void testConstructorShouldAcceptNiFiProperties() throws Exception { + void testConstructorShouldAcceptNiFiProperties() throws Exception { // Arrange Properties rawProperties = new Properties() rawProperties.setProperty("key", "value") @@ -178,7 +179,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { } @Test - public void testShouldAllowMultipleInstances() throws Exception { + void testShouldAllowMultipleInstances() throws Exception { // Arrange Properties rawProperties = new Properties() rawProperties.setProperty("key", "value") @@ -200,7 +201,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { } @Test - public void testShouldDetectIfPropertyIsSensitive() throws Exception { + void testShouldDetectIfPropertyIsSensitive() throws Exception { // Arrange final String INSENSITIVE_PROPERTY_KEY = "nifi.ui.banner.text" final String SENSITIVE_PROPERTY_KEY = "nifi.security.keystorePasswd" @@ -219,7 +220,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { } @Test - public void testShouldGetDefaultSensitiveProperties() throws Exception { + void testShouldGetDefaultSensitiveProperties() throws Exception { // Arrange logger.expected("${DEFAULT_SENSITIVE_PROPERTIES.size()} default sensitive properties: ${DEFAULT_SENSITIVE_PROPERTIES.join(", ")}") @@ -235,9 +236,9 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { } @Test - public void testShouldGetAdditionalSensitiveProperties() throws Exception { + void testShouldGetAdditionalSensitiveProperties() throws Exception { // Arrange - def completeSensitiveProperties = DEFAULT_SENSITIVE_PROPERTIES + ["nifi.ui.banner.text"] + def completeSensitiveProperties = DEFAULT_SENSITIVE_PROPERTIES + ["nifi.ui.banner.text", "nifi.version"] logger.expected("${completeSensitiveProperties.size()} total sensitive properties: ${completeSensitiveProperties.join(", ")}") ProtectedNiFiProperties properties = loadFromFile("/conf/nifi_with_additional_sensitive_keys.properties") @@ -254,7 +255,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { // TODO: Add negative tests (fuzz additional.keys property, etc.) @Test - public void testGetAdditionalSensitivePropertiesShouldNotIncludeSelf() throws Exception { + void testGetAdditionalSensitivePropertiesShouldNotIncludeSelf() throws Exception { // Arrange def completeSensitiveProperties = DEFAULT_SENSITIVE_PROPERTIES + ["nifi.ui.banner.text", "nifi.version"] logger.expected("${completeSensitiveProperties.size()} total sensitive properties: ${completeSensitiveProperties.join(", ")}") @@ -275,7 +276,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { * @throws Exception */ @Test - public void testShouldGetUnprotectedValueOfSensitiveProperty() throws Exception { + void testShouldGetUnprotectedValueOfSensitiveProperty() throws Exception { // Arrange final String KEYSTORE_PASSWORD_KEY = "nifi.security.keystorePasswd" final String EXPECTED_KEYSTORE_PASSWORD = "thisIsABadKeystorePassword" @@ -301,7 +302,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { * @throws Exception */ @Test - public void testShouldGetEmptyUnprotectedValueOfSensitiveProperty() throws Exception { + void testShouldGetEmptyUnprotectedValueOfSensitiveProperty() throws Exception { // Arrange final String TRUSTSTORE_PASSWORD_KEY = "nifi.security.truststorePasswd" final String EXPECTED_TRUSTSTORE_PASSWORD = "" @@ -329,7 +330,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { * @throws Exception */ @Test - public void testShouldGetUnprotectedValueOfSensitivePropertyWhenProtected() throws Exception { + void testShouldGetUnprotectedValueOfSensitivePropertyWhenProtected() throws Exception { // Arrange final String KEYSTORE_PASSWORD_KEY = "nifi.security.keystorePasswd" final String EXPECTED_KEYSTORE_PASSWORD = "thisIsABadKeystorePassword" @@ -356,7 +357,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { * @throws Exception */ @Test - public void testGetValueOfSensitivePropertyShouldHandleUnknownProtectionScheme() throws Exception { + void testGetValueOfSensitivePropertyShouldHandleUnknownProtectionScheme() throws Exception { // Arrange final String KEYSTORE_PASSWORD_KEY = "nifi.security.keystorePasswd" @@ -390,7 +391,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { * @throws Exception */ @Test - public void testGetValueOfSensitivePropertyShouldHandleSingleMalformedValue() throws Exception { + void testGetValueOfSensitivePropertyShouldHandleSingleMalformedValue() throws Exception { // Arrange final String KEYSTORE_PASSWORD_KEY = "nifi.security.keystorePasswd" @@ -425,7 +426,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { * @throws Exception */ @Test - public void testGetValueOfSensitivePropertyShouldHandleMultipleMalformedValues() throws Exception { + void testGetValueOfSensitivePropertyShouldHandleMultipleMalformedValues() throws Exception { // Arrange // Raw properties @@ -468,7 +469,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { * @throws Exception */ @Test - public void testShouldGetEmptyUnprotectedValueOfSensitivePropertyWithDefault() throws Exception { + void testShouldGetEmptyUnprotectedValueOfSensitivePropertyWithDefault() throws Exception { // Arrange final String TRUSTSTORE_PASSWORD_KEY = "nifi.security.truststorePasswd" final String EXPECTED_TRUSTSTORE_PASSWORD = "" @@ -502,7 +503,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { * @throws Exception */ @Test - public void testShouldGetUnprotectedValueOfSensitivePropertyWhenProtectedWithDefault() throws Exception { + void testShouldGetUnprotectedValueOfSensitivePropertyWhenProtectedWithDefault() throws Exception { // Arrange final String KEYSTORE_PASSWORD_KEY = "nifi.security.keystorePasswd" final String EXPECTED_KEYSTORE_PASSWORD = "thisIsABadKeystorePassword" @@ -538,7 +539,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { * @throws Exception */ @Test - public void testGetValueOfSensitivePropertyShouldHandleInvalidatedInternalCache() throws Exception { + void testGetValueOfSensitivePropertyShouldHandleInvalidatedInternalCache() throws Exception { // Arrange final String KEYSTORE_PASSWORD_KEY = "nifi.security.keystorePasswd" final String EXPECTED_KEYSTORE_PASSWORD = "thisIsABadKeystorePassword" @@ -567,7 +568,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { } @Test - public void testShouldDetectIfPropertyIsProtected() throws Exception { + void testShouldDetectIfPropertyIsProtected() throws Exception { // Arrange final String UNPROTECTED_PROPERTY_KEY = "nifi.security.truststorePasswd" final String PROTECTED_PROPERTY_KEY = "nifi.security.keystorePasswd" @@ -593,7 +594,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { } @Test - public void testShouldDetectIfPropertyWithEmptyProtectionSchemeIsProtected() throws Exception { + void testShouldDetectIfPropertyWithEmptyProtectionSchemeIsProtected() throws Exception { // Arrange final String UNPROTECTED_PROPERTY_KEY = "nifi.sensitive.props.key" @@ -611,7 +612,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { } @Test - public void testShouldGetPercentageOfSensitivePropertiesProtected_0() throws Exception { + void testShouldGetPercentageOfSensitivePropertiesProtected_0() throws Exception { // Arrange ProtectedNiFiProperties properties = loadFromFile("/conf/nifi.properties") @@ -620,14 +621,14 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { // Act double percentProtected = properties.getPercentOfSensitivePropertiesProtected() - logger.info("${percentProtected}% (${properties.getProtectedPropertyKeys().size()} of ${properties.getSensitivePropertyKeys().size()}) protected") + logger.info("${percentProtected}% (${properties.getProtectedPropertyKeys().size()} of ${properties.getPopulatedSensitivePropertyKeys().size()}) protected") // Assert assert percentProtected == 0.0 } @Test - public void testShouldGetPercentageOfSensitivePropertiesProtected_50() throws Exception { + void testShouldGetPercentageOfSensitivePropertiesProtected_75() throws Exception { // Arrange ProtectedNiFiProperties properties = loadFromFile("/conf/nifi_with_sensitive_properties_protected_aes.properties") @@ -636,14 +637,14 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { // Act double percentProtected = properties.getPercentOfSensitivePropertiesProtected() - logger.info("${percentProtected}% (${properties.getProtectedPropertyKeys().size()} of ${properties.getSensitivePropertyKeys().size()}) protected") + logger.info("${percentProtected}% (${properties.getProtectedPropertyKeys().size()} of ${properties.getPopulatedSensitivePropertyKeys().size()}) protected") // Assert - assert percentProtected == 50.0 + assert percentProtected == 75.0 } @Test - public void testShouldGetPercentageOfSensitivePropertiesProtected_100() throws Exception { + void testShouldGetPercentageOfSensitivePropertiesProtected_100() throws Exception { // Arrange ProtectedNiFiProperties properties = loadFromFile("/conf/nifi_with_all_sensitive_properties_protected_aes.properties") @@ -652,14 +653,14 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { // Act double percentProtected = properties.getPercentOfSensitivePropertiesProtected() - logger.info("${percentProtected}% (${properties.getProtectedPropertyKeys().size()} of ${properties.getSensitivePropertyKeys().size()}) protected") + logger.info("${percentProtected}% (${properties.getProtectedPropertyKeys().size()} of ${properties.getPopulatedSensitivePropertyKeys().size()}) protected") // Assert assert percentProtected == 100.0 } @Test - public void testInstanceWithNoProtectedPropertiesShouldNotLoadSPP() throws Exception { + void testInstanceWithNoProtectedPropertiesShouldNotLoadSPP() throws Exception { // Arrange ProtectedNiFiProperties properties = loadFromFile("/conf/nifi.properties") assert properties.@localProviderCache?.isEmpty() @@ -676,7 +677,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { } @Test - public void testShouldAddSensitivePropertyProvider() throws Exception { + void testShouldAddSensitivePropertyProvider() throws Exception { // Arrange ProtectedNiFiProperties properties = new ProtectedNiFiProperties() assert properties.getSensitivePropertyProviders().isEmpty() @@ -696,7 +697,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { } @Test - public void testShouldNotAddNullSensitivePropertyProvider() throws Exception { + void testShouldNotAddNullSensitivePropertyProvider() throws Exception { // Arrange ProtectedNiFiProperties properties = new ProtectedNiFiProperties() assert properties.getSensitivePropertyProviders().isEmpty() @@ -713,7 +714,7 @@ class ProtectedNiFiPropertiesGroovyTest extends GroovyTestCase { } @Test - public void testShouldNotAllowOverwriteOfProvider() throws Exception { + void testShouldNotAllowOverwriteOfProvider() throws Exception { // Arrange ProtectedNiFiProperties properties = new ProtectedNiFiProperties() assert properties.getSensitivePropertyProviders().isEmpty() http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/groovy/org/apache/nifi/properties/StandardNiFiPropertiesGroovyTest.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/groovy/org/apache/nifi/properties/StandardNiFiPropertiesGroovyTest.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/groovy/org/apache/nifi/properties/StandardNiFiPropertiesGroovyTest.groovy index c9492fb..ae43a3d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/groovy/org/apache/nifi/properties/StandardNiFiPropertiesGroovyTest.groovy +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/groovy/org/apache/nifi/properties/StandardNiFiPropertiesGroovyTest.groovy @@ -32,58 +32,60 @@ class StandardNiFiPropertiesGroovyTest extends GroovyTestCase { private static final Logger logger = LoggerFactory.getLogger(StandardNiFiPropertiesGroovyTest.class) private static String originalPropertiesPath = System.getProperty(NiFiProperties.PROPERTIES_FILE_PATH) + private static final String PREK = NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY + private static final String PREKID = NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_ID @BeforeClass - public static void setUpOnce() throws Exception { + static void setUpOnce() throws Exception { logger.metaClass.methodMissing = { String name, args -> logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") } } @Before - public void setUp() throws Exception { + void setUp() throws Exception { } @After - public void tearDown() throws Exception { + void tearDown() throws Exception { } @AfterClass - public static void tearDownOnce() { + static void tearDownOnce() { if (originalPropertiesPath) { System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, originalPropertiesPath) } } private static StandardNiFiProperties loadFromFile(String propertiesFilePath) { - String filePath; + String filePath try { - filePath = StandardNiFiPropertiesGroovyTest.class.getResource(propertiesFilePath).toURI().getPath(); + filePath = StandardNiFiPropertiesGroovyTest.class.getResource(propertiesFilePath).toURI().getPath() } catch (URISyntaxException ex) { throw new RuntimeException("Cannot load properties file due to " - + ex.getLocalizedMessage(), ex); + + ex.getLocalizedMessage(), ex) } - System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, filePath); + System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, filePath) - StandardNiFiProperties properties = new StandardNiFiProperties(); + StandardNiFiProperties properties = new StandardNiFiProperties() // clear out existing properties for (String prop : properties.stringPropertyNames()) { - properties.remove(prop); + properties.remove(prop) } - InputStream inStream = null; + InputStream inStream = null try { - inStream = new BufferedInputStream(new FileInputStream(filePath)); - properties.load(inStream); + inStream = new BufferedInputStream(new FileInputStream(filePath)) + properties.load(inStream) } catch (final Exception ex) { throw new RuntimeException("Cannot load properties file due to " - + ex.getLocalizedMessage(), ex); + + ex.getLocalizedMessage(), ex) } finally { if (null != inStream) { try { - inStream.close(); + inStream.close() } catch (Exception ex) { /** * do nothing * @@ -92,11 +94,11 @@ class StandardNiFiPropertiesGroovyTest extends GroovyTestCase { } } - return properties; + return properties } @Test - public void testConstructorShouldCreateNewInstance() throws Exception { + void testConstructorShouldCreateNewInstance() throws Exception { // Arrange // Act @@ -109,7 +111,7 @@ class StandardNiFiPropertiesGroovyTest extends GroovyTestCase { } @Test - public void testConstructorShouldAcceptRawProperties() throws Exception { + void testConstructorShouldAcceptRawProperties() throws Exception { // Arrange Properties rawProperties = new Properties() rawProperties.setProperty("key", "value") @@ -126,7 +128,7 @@ class StandardNiFiPropertiesGroovyTest extends GroovyTestCase { } @Test - public void testShouldAllowMultipleInstances() throws Exception { + void testShouldAllowMultipleInstances() throws Exception { // Arrange Properties rawProperties = new Properties() rawProperties.setProperty("key", "value") @@ -139,7 +141,6 @@ class StandardNiFiPropertiesGroovyTest extends GroovyTestCase { NiFiProperties emptyProperties = new StandardNiFiProperties() logger.info("emptyProperties has ${emptyProperties.size()} properties: ${emptyProperties.getPropertyKeys()}") - // Assert assert niFiProperties.size() == 1 assert niFiProperties.getPropertyKeys() == ["key"] as Set @@ -147,4 +148,178 @@ class StandardNiFiPropertiesGroovyTest extends GroovyTestCase { assert emptyProperties.size() == 0 assert emptyProperties.getPropertyKeys() == [] as Set } + + @Test + void testShouldGetProvenanceRepoEncryptionKeyFromDefaultProperty() throws Exception { + // Arrange + Properties rawProperties = new Properties() + final String KEY_ID = "arbitraryKeyId" + final String KEY_HEX = "0123456789ABCDEFFEDCBA9876543210" + rawProperties.setProperty(PREKID, KEY_ID) + rawProperties.setProperty(PREK, KEY_HEX) + NiFiProperties niFiProperties = new StandardNiFiProperties(rawProperties) + logger.info("niFiProperties has ${niFiProperties.size()} properties: ${niFiProperties.getPropertyKeys()}") + + // Act + def keyId = niFiProperties.getProvenanceRepoEncryptionKeyId() + def key = niFiProperties.getProvenanceRepoEncryptionKey() + def keys = niFiProperties.getProvenanceRepoEncryptionKeys() + + logger.info("Retrieved key ID: ${keyId}") + logger.info("Retrieved key: ${key}") + logger.info("Retrieved keys: ${keys}") + + // Assert + assert keyId == KEY_ID + assert key == KEY_HEX + assert keys == [(KEY_ID): KEY_HEX] + } + + @Test + void testShouldGetProvenanceRepoEncryptionKeysFromMultipleProperties() throws Exception { + // Arrange + Properties rawProperties = new Properties() + final String KEY_ID = "arbitraryKeyId" + final String KEY_HEX = "0123456789ABCDEFFEDCBA9876543210" + final String KEY_ID_2 = "arbitraryKeyId2" + final String KEY_HEX_2 = "AAAABBBBCCCCDDDDEEEEFFFF00001111" + final String KEY_ID_3 = "arbitraryKeyId3" + final String KEY_HEX_3 = "01010101010101010101010101010101" + + rawProperties.setProperty(PREKID, KEY_ID) + rawProperties.setProperty(PREK, KEY_HEX) + rawProperties.setProperty("${PREK}.id.${KEY_ID_2}", KEY_HEX_2) + rawProperties.setProperty("${PREK}.id.${KEY_ID_3}", KEY_HEX_3) + NiFiProperties niFiProperties = new StandardNiFiProperties(rawProperties) + logger.info("niFiProperties has ${niFiProperties.size()} properties: ${niFiProperties.getPropertyKeys()}") + + // Act + def keyId = niFiProperties.getProvenanceRepoEncryptionKeyId() + def key = niFiProperties.getProvenanceRepoEncryptionKey() + def keys = niFiProperties.getProvenanceRepoEncryptionKeys() + + logger.info("Retrieved key ID: ${keyId}") + logger.info("Retrieved key: ${key}") + logger.info("Retrieved keys: ${keys}") + + // Assert + assert keyId == KEY_ID + assert key == KEY_HEX + assert keys == [(KEY_ID): KEY_HEX, (KEY_ID_2): KEY_HEX_2, (KEY_ID_3): KEY_HEX_3] + } + + @Test + void testShouldGetProvenanceRepoEncryptionKeysWithNoDefaultDefined() throws Exception { + // Arrange + Properties rawProperties = new Properties() + final String KEY_ID = "arbitraryKeyId" + final String KEY_HEX = "0123456789ABCDEFFEDCBA9876543210" + final String KEY_ID_2 = "arbitraryKeyId2" + final String KEY_HEX_2 = "AAAABBBBCCCCDDDDEEEEFFFF00001111" + final String KEY_ID_3 = "arbitraryKeyId3" + final String KEY_HEX_3 = "01010101010101010101010101010101" + + rawProperties.setProperty(PREKID, KEY_ID) + rawProperties.setProperty("${PREK}.id.${KEY_ID}", KEY_HEX) + rawProperties.setProperty("${PREK}.id.${KEY_ID_2}", KEY_HEX_2) + rawProperties.setProperty("${PREK}.id.${KEY_ID_3}", KEY_HEX_3) + NiFiProperties niFiProperties = new StandardNiFiProperties(rawProperties) + logger.info("niFiProperties has ${niFiProperties.size()} properties: ${niFiProperties.getPropertyKeys()}") + + // Act + def keyId = niFiProperties.getProvenanceRepoEncryptionKeyId() + def key = niFiProperties.getProvenanceRepoEncryptionKey() + def keys = niFiProperties.getProvenanceRepoEncryptionKeys() + + logger.info("Retrieved key ID: ${keyId}") + logger.info("Retrieved key: ${key}") + logger.info("Retrieved keys: ${keys}") + + // Assert + assert keyId == KEY_ID + assert key == KEY_HEX + assert keys == [(KEY_ID): KEY_HEX, (KEY_ID_2): KEY_HEX_2, (KEY_ID_3): KEY_HEX_3] + } + + @Test + void testShouldGetProvenanceRepoEncryptionKeysWithNoneDefined() throws Exception { + // Arrange + Properties rawProperties = new Properties() + NiFiProperties niFiProperties = new StandardNiFiProperties(rawProperties) + logger.info("niFiProperties has ${niFiProperties.size()} properties: ${niFiProperties.getPropertyKeys()}") + + // Act + def keyId = niFiProperties.getProvenanceRepoEncryptionKeyId() + def key = niFiProperties.getProvenanceRepoEncryptionKey() + def keys = niFiProperties.getProvenanceRepoEncryptionKeys() + + logger.info("Retrieved key ID: ${keyId}") + logger.info("Retrieved key: ${key}") + logger.info("Retrieved keys: ${keys}") + + // Assert + assert keyId == null + assert key == null + assert keys == [:] + } + + @Test + void testShouldNotGetProvenanceRepoEncryptionKeysIfFileBasedKeyProvider() throws Exception { + // Arrange + Properties rawProperties = new Properties() + final String KEY_ID = "arbitraryKeyId" + + rawProperties.setProperty(PREKID, KEY_ID) + NiFiProperties niFiProperties = new StandardNiFiProperties(rawProperties) + logger.info("niFiProperties has ${niFiProperties.size()} properties: ${niFiProperties.getPropertyKeys()}") + + // Act + def keyId = niFiProperties.getProvenanceRepoEncryptionKeyId() + def key = niFiProperties.getProvenanceRepoEncryptionKey() + def keys = niFiProperties.getProvenanceRepoEncryptionKeys() + + logger.info("Retrieved key ID: ${keyId}") + logger.info("Retrieved key: ${key}") + logger.info("Retrieved keys: ${keys}") + + // Assert + assert keyId == KEY_ID + assert key == null + assert keys == [:] + } + + @Test + void testGetProvenanceRepoEncryptionKeysShouldFilterOtherProperties() throws Exception { + // Arrange + Properties rawProperties = new Properties() + final String KEY_ID = "arbitraryKeyId" + final String KEY_HEX = "0123456789ABCDEFFEDCBA9876543210" + final String KEY_ID_2 = "arbitraryKeyId2" + final String KEY_HEX_2 = "AAAABBBBCCCCDDDDEEEEFFFF00001111" + final String KEY_ID_3 = "arbitraryKeyId3" + final String KEY_HEX_3 = "01010101010101010101010101010101" + + rawProperties.setProperty(PREKID, KEY_ID) + rawProperties.setProperty("${PREK}.id.${KEY_ID}", KEY_HEX) + rawProperties.setProperty("${PREK}.id.${KEY_ID_2}", KEY_HEX_2) + rawProperties.setProperty("${PREK}.id.${KEY_ID_3}", KEY_HEX_3) + rawProperties.setProperty(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS, "some.class.provider") + rawProperties.setProperty(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_LOCATION, "some://url") + NiFiProperties niFiProperties = new StandardNiFiProperties(rawProperties) + logger.info("niFiProperties has ${niFiProperties.size()} properties: ${niFiProperties.getPropertyKeys()}") + + // Act + def keyId = niFiProperties.getProvenanceRepoEncryptionKeyId() + def key = niFiProperties.getProvenanceRepoEncryptionKey() + def keys = niFiProperties.getProvenanceRepoEncryptionKeys() + + logger.info("Retrieved key ID: ${keyId}") + logger.info("Retrieved key: ${key}") + logger.info("Retrieved keys: ${keys}") + + // Assert + assert keyId == KEY_ID + assert key == KEY_HEX + assert keys == [(KEY_ID): KEY_HEX, (KEY_ID_2): KEY_HEX_2, (KEY_ID_3): KEY_HEX_3] + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/resources/conf/nifi_with_additional_sensitive_keys.properties ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/resources/conf/nifi_with_additional_sensitive_keys.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/resources/conf/nifi_with_additional_sensitive_keys.properties index f775d83..6a88c25 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/resources/conf/nifi_with_additional_sensitive_keys.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/resources/conf/nifi_with_additional_sensitive_keys.properties @@ -73,7 +73,7 @@ nifi.web.jetty.working.directory=./target/work/jetty nifi.sensitive.props.key=key nifi.sensitive.props.algorithm=PBEWITHMD5AND256BITAES-CBC-OPENSSL nifi.sensitive.props.provider=BC -nifi.sensitive.props.additional.keys=nifi.ui.banner.text +nifi.sensitive.props.additional.keys=nifi.ui.banner.text, nifi.version, nifi.sensitive.props.additional.keys nifi.security.keystore= nifi.security.keystoreType= http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml index d6b1aaf..0baaed7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml @@ -91,6 +91,11 @@ <!-- persistent provenance repository properties --> <nifi.provenance.repository.implementation>org.apache.nifi.provenance.PersistentProvenanceRepository</nifi.provenance.repository.implementation> + <nifi.provenance.repository.debug.frequency>1_000_000</nifi.provenance.repository.debug.frequency> + <nifi.provenance.repository.encryption.key.provider.implementation/> + <nifi.provenance.repository.encryption.key.provider.location/> + <nifi.provenance.repository.encryption.key.id/> + <nifi.provenance.repository.encryption.key/> <nifi.provenance.repository.directory.default>./provenance_repository</nifi.provenance.repository.directory.default> <nifi.provenance.repository.max.storage.time>24 hours</nifi.provenance.repository.max.storage.time> <nifi.provenance.repository.max.storage.size>1 GB</nifi.provenance.repository.max.storage.size> http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index 62b4c8f..dadc5e6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -81,6 +81,11 @@ nifi.content.viewer.url=${nifi.content.viewer.url} # Provenance Repository Properties nifi.provenance.repository.implementation=${nifi.provenance.repository.implementation} +nifi.provenance.repository.debug.frequency=${nifi.provenance.repository.debug.frequency} +nifi.provenance.repository.encryption.key.provider.implementation=${nifi.provenance.repository.encryption.key.provider.implementation} +nifi.provenance.repository.encryption.key.provider.location=${nifi.provenance.repository.encryption.key.provider.location} +nifi.provenance.repository.encryption.key.id=${nifi.provenance.repository.encryption.key.id} +nifi.provenance.repository.encryption.key=${nifi.provenance.repository.encryption.key} # Persistent Provenance Repository Properties nifi.provenance.repository.directory.default=${nifi.provenance.repository.directory.default} http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/logback-test.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/logback-test.xml new file mode 100644 index 0000000..80b8b49 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/logback-test.xml @@ -0,0 +1,32 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<configuration scan="true" scanPeriod="30 seconds"> + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> + <pattern>%-4r [%t] %-5p %c - %m%n</pattern> + </encoder> + </appender> + + <!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR --> + <logger name="org.apache.nifi" level="INFO"/> + + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + </root> + +</configuration> + http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/test/resources/logback-test.xml b/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/test/resources/logback-test.xml new file mode 100644 index 0000000..5afbc8e --- /dev/null +++ b/nifi-nar-bundles/nifi-poi-bundle/nifi-poi-processors/src/test/resources/logback-test.xml @@ -0,0 +1,32 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<configuration scan="true" scanPeriod="30 seconds"> + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> + <pattern>%-4r [%t] %-5p %c - %m%n</pattern> + </encoder> + </appender> + + <!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR --> + <logger name="org.apache.nifi" level="WARN"/> + + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + </root> + +</configuration> + http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml index 4db4169..8fe5dbf 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml @@ -63,5 +63,10 @@ <artifactId>commons-lang3</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.bouncycastle</groupId> + <artifactId>bcprov-jdk15on</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordReader.java new file mode 100644 index 0000000..fcd7fee --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordReader.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.apache.nifi.provenance.schema.LookupTableEventRecord; +import org.apache.nifi.provenance.toc.TocReader; +import org.apache.nifi.repository.schema.Record; +import org.apache.nifi.stream.io.LimitingInputStream; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.timebuffer.LongEntityAccess; +import org.apache.nifi.util.timebuffer.TimedBuffer; +import org.apache.nifi.util.timebuffer.TimestampedLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EncryptedSchemaRecordReader extends EventIdFirstSchemaRecordReader { + private static final Logger logger = LoggerFactory.getLogger(EncryptedSchemaRecordReader.class); + + private static final int DEFAULT_DEBUG_FREQUENCY = 1_000_000; + + private ProvenanceEventEncryptor provenanceEventEncryptor; + + private static final TimedBuffer<TimestampedLong> decryptTimes = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess()); + + private int debugFrequency = DEFAULT_DEBUG_FREQUENCY; + public static final int SERIALIZATION_VERSION = 1; + + public static final String SERIALIZATION_NAME = "EncryptedSchemaRecordWriter"; + + public EncryptedSchemaRecordReader(final InputStream inputStream, final String filename, final TocReader tocReader, final int maxAttributeChars, + ProvenanceEventEncryptor provenanceEventEncryptor) throws IOException { + this(inputStream, filename, tocReader, maxAttributeChars, provenanceEventEncryptor, DEFAULT_DEBUG_FREQUENCY); + } + + public EncryptedSchemaRecordReader(final InputStream inputStream, final String filename, final TocReader tocReader, final int maxAttributeChars, + ProvenanceEventEncryptor provenanceEventEncryptor, int debugFrequency) throws IOException { + super(inputStream, filename, tocReader, maxAttributeChars); + this.provenanceEventEncryptor = provenanceEventEncryptor; + this.debugFrequency = debugFrequency; + } + + @Override + protected StandardProvenanceEventRecord nextRecord(final DataInputStream in, final int serializationVersion) throws IOException { + verifySerializationVersion(serializationVersion); + + final long byteOffset = getBytesConsumed(); + final long eventId = in.readInt() + getFirstEventId(); + final int recordLength = in.readInt(); + + return readRecord(in, eventId, byteOffset, recordLength); + } + + private StandardProvenanceEventRecord readRecord(final DataInputStream inputStream, final long eventId, final long startOffset, final int recordLength) throws IOException { + try { + final InputStream limitedIn = new LimitingInputStream(inputStream, recordLength); + + byte[] encryptedSerializedBytes = new byte[recordLength]; + DataInputStream encryptedInputStream = new DataInputStream(limitedIn); + encryptedInputStream.readFully(encryptedSerializedBytes); + + byte[] plainSerializedBytes = decrypt(encryptedSerializedBytes, Long.toString(eventId)); + InputStream plainStream = new ByteArrayInputStream(plainSerializedBytes); + + final Record eventRecord = getRecordReader().readRecord(plainStream); + if (eventRecord == null) { + return null; + } + + final StandardProvenanceEventRecord deserializedEvent = LookupTableEventRecord.getEvent(eventRecord, getFilename(), startOffset, getMaxAttributeLength(), + getFirstEventId(), getSystemTimeOffset(), getComponentIds(), getComponentTypes(), getQueueIds(), getEventTypes()); + deserializedEvent.setEventId(eventId); + return deserializedEvent; + } catch (EncryptionException e) { + logger.error("Encountered an error reading the record: ", e); + throw new IOException(e); + } + } + + // TODO: Copied from EventIdFirstSchemaRecordReader to force local/overridden readRecord() + @Override + protected Optional<StandardProvenanceEventRecord> readToEvent(final long eventId, final DataInputStream dis, final int serializationVersion) throws IOException { + verifySerializationVersion(serializationVersion); + + while (isData(dis)) { + final long startOffset = getBytesConsumed(); + final long id = dis.readInt() + getFirstEventId(); + final int recordLength = dis.readInt(); + + if (id >= eventId) { + final StandardProvenanceEventRecord event = readRecord(dis, id, startOffset, recordLength); + return Optional.ofNullable(event); + } else { + // This is not the record we want. Skip over it instead of deserializing it. + StreamUtils.skip(dis, recordLength); + } + } + + return Optional.empty(); + } + + private byte[] decrypt(byte[] encryptedBytes, String eventId) throws IOException, EncryptionException { + try { + return provenanceEventEncryptor.decrypt(encryptedBytes, eventId); + } catch (Exception e) { + logger.error("Encountered an error: ", e); + throw new EncryptionException(e); + } + } + + @Override + public String toString() { + return getDescription(); + } + + private String getDescription() { + try { + return "EncryptedSchemaRecordReader, toc: " + getTocReader().getFile().getAbsolutePath() + ", journal: " + getFilename(); + } catch (Exception e) { + return "EncryptedSchemaRecordReader@" + Integer.toHexString(this.hashCode()); + } + } + + /** + * Sets the encryptor to use (necessary because the + * {@link org.apache.nifi.provenance.serialization.RecordReaders#newRecordReader(File, Collection, int)} method doesn't accept the encryptor. + * + * @param provenanceEventEncryptor the encryptor + */ + void setProvenanceEventEncryptor(ProvenanceEventEncryptor provenanceEventEncryptor) { + this.provenanceEventEncryptor = provenanceEventEncryptor; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordWriter.java new file mode 100644 index 0000000..f84ca48 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordWriter.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.security.KeyManagementException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.nifi.provenance.serialization.StorageSummary; +import org.apache.nifi.provenance.toc.TocWriter; +import org.apache.nifi.util.timebuffer.LongEntityAccess; +import org.apache.nifi.util.timebuffer.TimedBuffer; +import org.apache.nifi.util.timebuffer.TimestampedLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EncryptedSchemaRecordWriter extends EventIdFirstSchemaRecordWriter { + private static final Logger logger = LoggerFactory.getLogger(EncryptedSchemaRecordWriter.class); + + private static final int DEFAULT_DEBUG_FREQUENCY = 1_000_000; + + private ProvenanceEventEncryptor provenanceEventEncryptor; + + private static final TimedBuffer<TimestampedLong> encryptTimes = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess()); + + private String keyId; + + private int debugFrequency; + public static final int SERIALIZATION_VERSION = 1; + + public static final String SERIALIZATION_NAME = "EncryptedSchemaRecordWriter"; + + public EncryptedSchemaRecordWriter(final File file, final AtomicLong idGenerator, final TocWriter writer, final boolean compressed, + final int uncompressedBlockSize, final IdentifierLookup idLookup, + ProvenanceEventEncryptor provenanceEventEncryptor) throws IOException, EncryptionException { + this(file, idGenerator, writer, compressed, uncompressedBlockSize, idLookup, provenanceEventEncryptor, DEFAULT_DEBUG_FREQUENCY); + } + + public EncryptedSchemaRecordWriter(final File file, final AtomicLong idGenerator, final TocWriter writer, final boolean compressed, + final int uncompressedBlockSize, final IdentifierLookup idLookup, + ProvenanceEventEncryptor provenanceEventEncryptor, int debugFrequency) throws IOException, EncryptionException { + super(file, idGenerator, writer, compressed, uncompressedBlockSize, idLookup); + this.provenanceEventEncryptor = provenanceEventEncryptor; + this.debugFrequency = debugFrequency; + + try { + this.keyId = getNextAvailableKeyId(); + } catch (KeyManagementException e) { + logger.error("Encountered an error initializing the encrypted schema record writer because the provided encryptor has no valid keys available: ", e); + throw new EncryptionException("No valid keys in the provenance event encryptor", e); + } + } + + @Override + public StorageSummary writeRecord(final ProvenanceEventRecord record) throws IOException { + final long encryptStart = System.nanoTime(); + byte[] cipherBytes; + try { + byte[] serialized; + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(256); + final DataOutputStream dos = new DataOutputStream(baos)) { + writeRecord(record, 0L, dos); + serialized = baos.toByteArray(); + } + String eventId = record.getBestEventIdentifier(); + cipherBytes = encrypt(serialized, eventId); + } catch (EncryptionException e) { + logger.error("Encountered an error: ", e); + throw new IOException("Error encrypting the provenance record", e); + } + final long encryptStop = System.nanoTime(); + + final long lockStart = System.nanoTime(); + final long writeStart; + final long startBytes; + final long endBytes; + final long recordIdentifier; + synchronized (this) { + writeStart = System.nanoTime(); + try { + recordIdentifier = record.getEventId() == -1L ? getIdGenerator().getAndIncrement() : record.getEventId(); + startBytes = getBytesWritten(); + + ensureStreamState(recordIdentifier, startBytes); + + final DataOutputStream out = getBufferedOutputStream(); + final int recordIdOffset = (int) (recordIdentifier - getFirstEventId()); + out.writeInt(recordIdOffset); + out.writeInt(cipherBytes.length); + out.write(cipherBytes); + + getRecordCount().incrementAndGet(); + endBytes = getBytesWritten(); + } catch (final IOException ioe) { + markDirty(); + throw ioe; + } + } + + if (logger.isDebugEnabled()) { + // Collect stats and periodically dump them if log level is set to at least info. + final long writeNanos = System.nanoTime() - writeStart; + getWriteTimes().add(new TimestampedLong(writeNanos)); + + final long serializeNanos = lockStart - encryptStart; + getSerializeTimes().add(new TimestampedLong(serializeNanos)); + + final long encryptNanos = encryptStop - encryptStart; + getEncryptTimes().add(new TimestampedLong(encryptNanos)); + + final long lockNanos = writeStart - lockStart; + getLockTimes().add(new TimestampedLong(lockNanos)); + getBytesWrittenBuffer().add(new TimestampedLong(endBytes - startBytes)); + + final long recordCount = getTotalRecordCount().incrementAndGet(); + if (recordCount % debugFrequency == 0) { + printStats(); + } + } + + final long serializedLength = endBytes - startBytes; + final TocWriter tocWriter = getTocWriter(); + final Integer blockIndex = tocWriter == null ? null : tocWriter.getCurrentBlockIndex(); + final File file = getFile(); + final String storageLocation = file.getParentFile().getName() + "/" + file.getName(); + return new StorageSummary(recordIdentifier, storageLocation, blockIndex, serializedLength, endBytes); + } + + private void printStats() { + final long sixtySecondsAgo = System.currentTimeMillis() - 60000L; + final Long writeNanosLast60 = getWriteTimes().getAggregateValue(sixtySecondsAgo).getValue(); + final Long lockNanosLast60 = getLockTimes().getAggregateValue(sixtySecondsAgo).getValue(); + final Long serializeNanosLast60 = getSerializeTimes().getAggregateValue(sixtySecondsAgo).getValue(); + final Long encryptNanosLast60 = getEncryptTimes().getAggregateValue(sixtySecondsAgo).getValue(); + final Long bytesWrittenLast60 = getBytesWrittenBuffer().getAggregateValue(sixtySecondsAgo).getValue(); + logger.debug("In the last 60 seconds, have spent {} millis writing to file ({} MB), {} millis waiting on synchronize block, {} millis serializing events, {} millis encrypting events", + TimeUnit.NANOSECONDS.toMillis(writeNanosLast60), + bytesWrittenLast60 / 1024 / 1024, + TimeUnit.NANOSECONDS.toMillis(lockNanosLast60), + TimeUnit.NANOSECONDS.toMillis(serializeNanosLast60), + TimeUnit.NANOSECONDS.toMillis(encryptNanosLast60)); + } + + static TimedBuffer<TimestampedLong> getEncryptTimes() { + return encryptTimes; + } + + private byte[] encrypt(byte[] serialized, String eventId) throws IOException, EncryptionException { + String keyId = getKeyId(); + try { + return provenanceEventEncryptor.encrypt(serialized, eventId, keyId); + } catch (Exception e) { + logger.error("Encountered an error: ", e); + throw new EncryptionException(e); + } + } + + private String getNextAvailableKeyId() throws KeyManagementException { + return provenanceEventEncryptor.getNextKeyId(); + } + + @Override + protected int getSerializationVersion() { + return SERIALIZATION_VERSION; + } + + @Override + protected String getSerializationName() { + return SERIALIZATION_NAME; + } + + public String getKeyId() { + return keyId; + } + + @Override + public String toString() { + return "EncryptedSchemaRecordWriter" + + " using " + provenanceEventEncryptor + + " and current keyId " + keyId; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepository.java new file mode 100644 index 0000000..a2d455b --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepository.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import java.io.IOException; +import java.security.KeyManagementException; +import java.util.Map; +import java.util.stream.Collectors; +import javax.crypto.SecretKey; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.provenance.serialization.RecordReaders; +import org.apache.nifi.provenance.store.EventFileManager; +import org.apache.nifi.provenance.store.RecordReaderFactory; +import org.apache.nifi.provenance.store.RecordWriterFactory; +import org.apache.nifi.provenance.toc.StandardTocWriter; +import org.apache.nifi.provenance.toc.TocUtil; +import org.apache.nifi.provenance.toc.TocWriter; +import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EncryptedWriteAheadProvenanceRepository extends WriteAheadProvenanceRepository { + private static final Logger logger = LoggerFactory.getLogger(EncryptedWriteAheadProvenanceRepository.class); + + /** + * This constructor exists solely for the use of the Java Service Loader mechanism and should not be used. + */ + public EncryptedWriteAheadProvenanceRepository() { + super(); + } + + public EncryptedWriteAheadProvenanceRepository(final NiFiProperties nifiProperties) { + super(RepositoryConfiguration.create(nifiProperties)); + } + + public EncryptedWriteAheadProvenanceRepository(final RepositoryConfiguration config) { + super(config); + } + + /** + * This method initializes the repository. It first builds the key provider and event encryptor + * from the config values, then creates the encrypted record writer and reader, then delegates + * back to the superclass for the common implementation. + * + * @param eventReporter the event reporter + * @param authorizer the authorizer + * @param resourceFactory the authorizable factory + * @param idLookup the lookup provider + * @throws IOException if there is an error initializing this repository + */ + @Override + public synchronized void initialize(final EventReporter eventReporter, final Authorizer authorizer, final ProvenanceAuthorizableFactory resourceFactory, + final IdentifierLookup idLookup) throws IOException { + // Initialize the encryption-specific fields + ProvenanceEventEncryptor provenanceEventEncryptor; + if (getConfig().supportsEncryption()) { + try { + KeyProvider keyProvider = buildKeyProvider(); + provenanceEventEncryptor = new AESProvenanceEventEncryptor(); + provenanceEventEncryptor.initialize(keyProvider); + } catch (KeyManagementException e) { + String msg = "Encountered an error building the key provider"; + logger.error(msg, e); + throw new IOException(msg, e); + } + } else { + throw new IOException("The provided configuration does not support a encrypted repository"); + } + + // Build a factory using lambda which injects the encryptor + final RecordWriterFactory recordWriterFactory = (file, idGenerator, compressed, createToc) -> { + try { + final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null; + return new EncryptedSchemaRecordWriter(file, idGenerator, tocWriter, compressed, BLOCK_SIZE, idLookup, provenanceEventEncryptor, getConfig().getDebugFrequency()); + } catch (EncryptionException e) { + logger.error("Encountered an error building the schema record writer factory: ", e); + throw new IOException(e); + } + }; + + // Build a factory using lambda which injects the encryptor + final EventFileManager fileManager = new EventFileManager(); + final RecordReaderFactory recordReaderFactory = (file, logs, maxChars) -> { + fileManager.obtainReadLock(file); + try { + EncryptedSchemaRecordReader tempReader = (EncryptedSchemaRecordReader) RecordReaders.newRecordReader(file, logs, maxChars); + tempReader.setProvenanceEventEncryptor(provenanceEventEncryptor); + return tempReader; + } finally { + fileManager.releaseReadLock(file); + } + }; + + // Delegate the init to the parent impl + super.init(recordWriterFactory, recordReaderFactory, eventReporter, authorizer, resourceFactory); + } + + private KeyProvider buildKeyProvider() throws KeyManagementException { + RepositoryConfiguration config = super.getConfig(); + if (config == null) { + throw new KeyManagementException("The repository configuration is missing"); + } + + final String implementationClassName = config.getKeyProviderImplementation(); + if (implementationClassName == null) { + throw new KeyManagementException("Cannot create Key Provider because the NiFi Properties is missing the following property: " + + NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS); + } + + // TODO: Extract to factory + KeyProvider keyProvider; + if (StaticKeyProvider.class.getName().equals(implementationClassName)) { + // Get all the keys (map) from config + if (CryptoUtils.isValidKeyProvider(implementationClassName, config.getKeyProviderLocation(), config.getKeyId(), config.getEncryptionKeys())) { + Map<String, SecretKey> formedKeys = config.getEncryptionKeys().entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> { + try { + return CryptoUtils.formKeyFromHex(e.getValue()); + } catch (KeyManagementException e1) { + // This should never happen because the hex has already been validated + logger.error("Encountered an error: ", e1); + return null; + } + })); + keyProvider = new StaticKeyProvider(formedKeys); + } else { + final String msg = "The StaticKeyProvider definition is not valid"; + logger.error(msg); + throw new KeyManagementException(msg); + } + } else if (FileBasedKeyProvider.class.getName().equals(implementationClassName)) { + keyProvider = new FileBasedKeyProvider(config.getKeyProviderLocation()); + if (!keyProvider.keyExists(config.getKeyId())) { + throw new KeyManagementException("The specified key ID " + config.getKeyId() + " is not in the key definition file"); + } + } else { + throw new KeyManagementException("Invalid key provider implementation provided: " + implementationClassName); + } + + return keyProvider; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordReader.java index 612b6c8..bd85846 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordReader.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordReader.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.io.InputStream; import java.util.List; import java.util.Optional; - import org.apache.nifi.provenance.schema.EventIdFirstHeaderSchema; import org.apache.nifi.provenance.schema.LookupTableEventRecord; import org.apache.nifi.provenance.serialization.CompressableRecordReader; @@ -35,6 +34,14 @@ import org.apache.nifi.stream.io.LimitingInputStream; import org.apache.nifi.stream.io.StreamUtils; public class EventIdFirstSchemaRecordReader extends CompressableRecordReader { + RecordSchema getSchema() { + return schema; + } + + SchemaRecordReader getRecordReader() { + return recordReader; + } + private RecordSchema schema; // effectively final private SchemaRecordReader recordReader; // effectively final @@ -43,16 +50,41 @@ public class EventIdFirstSchemaRecordReader extends CompressableRecordReader { private List<String> queueIds; private List<String> eventTypes; private long firstEventId; + + List<String> getComponentIds() { + return componentIds; + } + + List<String> getComponentTypes() { + return componentTypes; + } + + List<String> getQueueIds() { + return queueIds; + } + + List<String> getEventTypes() { + return eventTypes; + } + + long getFirstEventId() { + return firstEventId; + } + + long getSystemTimeOffset() { + return systemTimeOffset; + } + private long systemTimeOffset; public EventIdFirstSchemaRecordReader(final InputStream in, final String filename, final TocReader tocReader, final int maxAttributeChars) throws IOException { super(in, filename, tocReader, maxAttributeChars); } - private void verifySerializationVersion(final int serializationVersion) { + protected void verifySerializationVersion(final int serializationVersion) { if (serializationVersion > EventIdFirstSchemaRecordWriter.SERIALIZATION_VERSION) { throw new IllegalArgumentException("Unable to deserialize record because the version is " + serializationVersion - + " and supported versions are 1-" + EventIdFirstSchemaRecordWriter.SERIALIZATION_VERSION); + + " and supported versions are 1-" + EventIdFirstSchemaRecordWriter.SERIALIZATION_VERSION); } } @@ -109,12 +141,12 @@ public class EventIdFirstSchemaRecordReader extends CompressableRecordReader { } final StandardProvenanceEventRecord deserializedEvent = LookupTableEventRecord.getEvent(eventRecord, getFilename(), startOffset, getMaxAttributeLength(), - firstEventId, systemTimeOffset, componentIds, componentTypes, queueIds, eventTypes); + firstEventId, systemTimeOffset, componentIds, componentTypes, queueIds, eventTypes); deserializedEvent.setEventId(eventId); return deserializedEvent; } - private boolean isData(final InputStream in) throws IOException { + protected boolean isData(final InputStream in) throws IOException { in.mark(1); final int nextByte = in.read(); in.reset(); @@ -142,4 +174,17 @@ public class EventIdFirstSchemaRecordReader extends CompressableRecordReader { return Optional.empty(); } + + @Override + public String toString() { + return getDescription(); + } + + private String getDescription() { + try { + return "EventIdFirstSchemaRecordReader, toc: " + getTocReader().getFile().getAbsolutePath() + ", journal: " + getFilename(); + } catch (Exception e) { + return "EventIdFirstSchemaRecordReader@" + Integer.toHexString(this.hashCode()); + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java index bb8d52f..8f5b2b2 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; - import org.apache.nifi.provenance.schema.EventFieldNames; import org.apache.nifi.provenance.schema.EventIdFirstHeaderSchema; import org.apache.nifi.provenance.schema.LookupTableEventRecord; @@ -238,4 +237,46 @@ public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter { return SERIALIZATION_NAME; } + /* Getters for internal state written to by subclass EncryptedSchemaRecordWriter */ + + IdentifierLookup getIdLookup() { + return idLookup; + } + + SchemaRecordWriter getSchemaRecordWriter() { + return schemaRecordWriter; + } + + AtomicInteger getRecordCount() { + return recordCount; + } + + static TimedBuffer<TimestampedLong> getSerializeTimes() { + return serializeTimes; + } + + static TimedBuffer<TimestampedLong> getLockTimes() { + return lockTimes; + } + + static TimedBuffer<TimestampedLong> getWriteTimes() { + return writeTimes; + } + + static TimedBuffer<TimestampedLong> getBytesWrittenBuffer() { + return bytesWritten; + } + + static AtomicLong getTotalRecordCount() { + return totalRecordCount; + } + + long getFirstEventId() { + return firstEventId; + } + + long getSystemTimeOffset() { + return systemTimeOffset; + } + }
