NIFI-30: Merged code into develop
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/7db27c01 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/7db27c01 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/7db27c01 Branch: refs/heads/develop Commit: 7db27c01b2e1f0e1aa66ee3a861997e24b618502 Parents: 262a873 a4285e9 Author: Mark Payne <[email protected]> Authored: Fri May 1 10:28:26 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Fri May 1 10:28:26 2015 -0400 ---------------------------------------------------------------------- .../nifi/security/util/EncryptionMethod.java | 5 +- .../nifi-standard-processors/pom.xml | 6 + .../processors/standard/EncryptContent.java | 586 ++++++++++--------- .../standard/util/OpenPGPKeyBasedEncryptor.java | 277 +++++++++ .../util/OpenPGPPasswordBasedEncryptor.java | 175 ++++++ .../standard/util/PasswordBasedEncryptor.java | 155 +++++ .../additionalDetails.html | 30 + .../processors/standard/TestEncryptContent.java | 248 +++++--- .../test/resources/TestEncryptContent/text.txt | 17 + .../resources/TestEncryptContent/text.txt.asc | 33 ++ nifi/pom.xml | 5 + 11 files changed, 1169 insertions(+), 368 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7db27c01/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7db27c01/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java index 7fe9fbc,ed4cc0d..6492d0a --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java @@@ -1,282 -1,306 +1,304 @@@ - /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package org.apache.nifi.processors.standard; - - import org.apache.nifi.processor.ProcessContext; - import org.apache.nifi.processor.AbstractProcessor; - import org.apache.nifi.processor.ProcessorInitializationContext; - import org.apache.nifi.processor.ProcessSession; - import org.apache.nifi.processor.Relationship; - import org.apache.nifi.components.PropertyDescriptor; - import org.apache.nifi.flowfile.FlowFile; - import org.apache.nifi.stream.io.StreamUtils; - import org.apache.nifi.logging.ProcessorLog; - import org.apache.nifi.annotation.documentation.CapabilityDescription; - import org.apache.nifi.annotation.behavior.EventDriven; - import org.apache.nifi.annotation.behavior.SideEffectFree; - import org.apache.nifi.annotation.behavior.SupportsBatching; - import org.apache.nifi.annotation.documentation.Tags; - import org.apache.nifi.processor.exception.ProcessException; - import org.apache.nifi.processor.io.StreamCallback; - import org.apache.nifi.processor.util.StandardValidators; - import org.apache.nifi.security.util.EncryptionMethod; - import org.apache.nifi.util.StopWatch; - - import org.bouncycastle.jce.provider.BouncyCastleProvider; - - import javax.crypto.spec.PBEKeySpec; - import javax.crypto.spec.PBEParameterSpec; - - import java.io.IOException; - import java.io.InputStream; - import java.io.OutputStream; - import java.security.InvalidAlgorithmParameterException; - import java.security.InvalidKeyException; - import java.security.SecureRandom; - import java.security.Security; - import java.text.Normalizer; - import java.util.ArrayList; - import java.util.Collections; - import java.util.HashSet; - import java.util.List; - import java.util.Set; - import java.util.concurrent.TimeUnit; - import javax.crypto.BadPaddingException; - import javax.crypto.Cipher; - import javax.crypto.IllegalBlockSizeException; - import javax.crypto.SecretKey; - import javax.crypto.SecretKeyFactory; - - @EventDriven - @SideEffectFree - @SupportsBatching - @Tags({"encryption", "decryption", "password", "JCE"}) - @CapabilityDescription("Encrypts or Decrypts a FlowFile using a randomly generated salt") - public class EncryptContent extends AbstractProcessor { - - public static final String ENCRYPT_MODE = "Encrypt"; - public static final String DECRYPT_MODE = "Decrypt"; - public static final String SECURE_RANDOM_ALGORITHM = "SHA1PRNG"; - public static final int DEFAULT_SALT_SIZE = 8; - - public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() - .name("Mode") - .description("Specifies whether the content should be encrypted or decrypted") - .required(true) - .allowableValues(ENCRYPT_MODE, DECRYPT_MODE) - .defaultValue(ENCRYPT_MODE) - .build(); - public static final PropertyDescriptor ENCRYPTION_ALGORITHM = new PropertyDescriptor.Builder() - .name("Encryption Algorithm") - .description("The Encryption Algorithm to use") - .required(true) - .allowableValues(EncryptionMethod.values()) - .defaultValue(EncryptionMethod.MD5_256AES.name()) - .build(); - public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() - .name("Password") - .description("The Password to use for encrypting or decrypting the data") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .sensitive(true) - .build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Any FlowFile that is successfully encrypted or decrypted will be routed to success") - .build(); - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("Any FlowFile that cannot be encrypted or decrypted will be routed to failure") - .build(); - - private List<PropertyDescriptor> properties; - private Set<Relationship> relationships; - - static { - Security.addProvider(new BouncyCastleProvider()); - } - - @Override - protected void init(final ProcessorInitializationContext context) { - final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(MODE); - properties.add(ENCRYPTION_ALGORITHM); - properties.add(PASSWORD); - this.properties = Collections.unmodifiableList(properties); - - final Set<Relationship> relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - this.relationships = Collections.unmodifiableSet(relationships); - } - - @Override - public Set<Relationship> getRelationships() { - return relationships; - } - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return properties; - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - final ProcessorLog logger = getLogger(); - final String method = context.getProperty(ENCRYPTION_ALGORITHM).getValue(); - final EncryptionMethod encryptionMethod = EncryptionMethod.valueOf(method); - final String providerName = encryptionMethod.getProvider(); - final String algorithm = encryptionMethod.getAlgorithm(); - - final String password = context.getProperty(PASSWORD).getValue(); - final char[] normalizedPassword = Normalizer.normalize(password, Normalizer.Form.NFC).toCharArray(); - final PBEKeySpec pbeKeySpec = new PBEKeySpec(normalizedPassword); - - final SecureRandom secureRandom; - final SecretKeyFactory factory; - final SecretKey secretKey; - final Cipher cipher; - try { - secureRandom = SecureRandom.getInstance(SECURE_RANDOM_ALGORITHM); - secureRandom.setSeed(System.currentTimeMillis()); - factory = SecretKeyFactory.getInstance(algorithm, providerName); - secretKey = factory.generateSecret(pbeKeySpec); - cipher = Cipher.getInstance(algorithm, providerName); - } catch (final Exception e) { - logger.error("failed to initialize Encryption/Decryption algorithm due to {}", new Object[]{e}); - session.transfer(flowFile, REL_FAILURE); - return; - } - - final int algorithmBlockSize = cipher.getBlockSize(); - final int saltSize = (algorithmBlockSize > 0) ? algorithmBlockSize : DEFAULT_SALT_SIZE; - - final StopWatch stopWatch = new StopWatch(true); - final String mode = context.getProperty(MODE).getValue(); - try { - if (mode.equalsIgnoreCase(ENCRYPT_MODE)) { - final byte[] salt = new byte[saltSize]; - secureRandom.nextBytes(salt); - - final PBEParameterSpec parameterSpec = new PBEParameterSpec(salt, 1000); - try { - cipher.init(Cipher.ENCRYPT_MODE, secretKey, parameterSpec); - } catch (final InvalidKeyException | InvalidAlgorithmParameterException e) { - logger.error("unable to encrypt {} due to {}", new Object[]{flowFile, e}); - session.transfer(flowFile, REL_FAILURE); - return; - } - - flowFile = session.write(flowFile, new EncryptCallback(cipher, salt)); - logger.info("Successfully encrypted {}", new Object[]{flowFile}); - } else { - if (flowFile.getSize() <= saltSize) { - logger.error("Cannot decrypt {} because its file size is not greater than the salt size", new Object[]{flowFile}); - session.transfer(flowFile, REL_FAILURE); - return; - } - - flowFile = session.write(flowFile, new DecryptCallback(cipher, secretKey, saltSize)); - logger.info("successfully decrypted {}", new Object[]{flowFile}); - } - - session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - session.transfer(flowFile, REL_SUCCESS); - } catch (final ProcessException pe) { - getLogger().error("Failed to {} {} due to {}; routing to failure", new Object[] {mode, flowFile, pe}); - session.transfer(flowFile, REL_FAILURE); - } - } - - private static class DecryptCallback implements StreamCallback { - - private final Cipher cipher; - private final SecretKey secretKey; - private final int saltSize; - - public DecryptCallback(final Cipher cipher, final SecretKey secretKey, final int saltSize) { - this.cipher = cipher; - this.secretKey = secretKey; - this.saltSize = saltSize; - } - - @Override - public void process(final InputStream in, final OutputStream out) throws IOException { - final byte[] salt = new byte[saltSize]; - StreamUtils.fillBuffer(in, salt); - - final PBEParameterSpec parameterSpec = new PBEParameterSpec(salt, 1000); - try { - cipher.init(Cipher.DECRYPT_MODE, secretKey, parameterSpec); - } catch (final Exception e) { - throw new ProcessException(e); - } - - final byte[] buffer = new byte[65536]; - int len; - while ((len = in.read(buffer)) > 0) { - final byte[] decryptedBytes = cipher.update(buffer, 0, len); - if (decryptedBytes != null) { - out.write(decryptedBytes); - } - } - - try { - out.write(cipher.doFinal()); - } catch (final Exception e) { - throw new ProcessException(e); - } - } - } - - private static class EncryptCallback implements StreamCallback { - - private final Cipher cipher; - private final byte[] salt; - - public EncryptCallback(final Cipher cipher, final byte[] salt) { - this.cipher = cipher; - this.salt = salt; - } - - @Override - public void process(final InputStream in, final OutputStream out) throws IOException { - out.write(salt); - - final byte[] buffer = new byte[65536]; - int len; - while ((len = in.read(buffer)) > 0) { - final byte[] encryptedBytes = cipher.update(buffer, 0, len); - if (encryptedBytes != null) { - out.write(encryptedBytes); - } - } - - try { - out.write(cipher.doFinal()); - } catch (final IllegalBlockSizeException | BadPaddingException e) { - throw new ProcessException(e); - } - } - } - } + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.security.Security; + import java.text.Normalizer; + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.HashSet; + import java.util.List; + import java.util.Set; + import java.util.concurrent.TimeUnit; + + import org.apache.nifi.annotation.behavior.EventDriven; + import org.apache.nifi.annotation.behavior.SideEffectFree; + import org.apache.nifi.annotation.behavior.SupportsBatching; + import org.apache.nifi.annotation.documentation.CapabilityDescription; + import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.ValidationContext; + import org.apache.nifi.components.ValidationResult; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.flowfile.attributes.CoreAttributes; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.io.StreamCallback; + import org.apache.nifi.processor.util.StandardValidators; + import org.apache.nifi.processors.standard.util.OpenPGPKeyBasedEncryptor; + import org.apache.nifi.processors.standard.util.OpenPGPPasswordBasedEncryptor; + import org.apache.nifi.processors.standard.util.PasswordBasedEncryptor; + import org.apache.nifi.security.util.EncryptionMethod; + import org.apache.nifi.util.StopWatch; + import org.bouncycastle.jce.provider.BouncyCastleProvider; + + @EventDriven + @SideEffectFree + @SupportsBatching + @Tags({"encryption", "decryption", "password", "JCE", "OpenPGP", "PGP", "GPG"}) + @CapabilityDescription("Encrypts or Decrypts a FlowFile using either symmetric encryption with a password and randomly generated salt, or asymmetric encryption using a public and secret key.") + public class EncryptContent extends AbstractProcessor { + + public static final String ENCRYPT_MODE = "Encrypt"; + public static final String DECRYPT_MODE = "Decrypt"; + + public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() + .name("Mode") + .description("Specifies whether the content should be encrypted or decrypted") + .required(true) + .allowableValues(ENCRYPT_MODE, DECRYPT_MODE) + .defaultValue(ENCRYPT_MODE) + .build(); + public static final PropertyDescriptor ENCRYPTION_ALGORITHM = new PropertyDescriptor.Builder() + .name("Encryption Algorithm") + .description("The Encryption Algorithm to use") + .required(true) + .allowableValues(EncryptionMethod.values()) + .defaultValue(EncryptionMethod.MD5_256AES.name()) + .build(); + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("Password") + .description("The Password to use for encrypting or decrypting the data") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + public static final PropertyDescriptor PUBLIC_KEYRING = new PropertyDescriptor.Builder() + .name("public-keyring-file") + .displayName("Public Keyring File") + .description("In a PGP encrypt mode, this keyring contains the public key of the recipient") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PUBLIC_KEY_USERID = new PropertyDescriptor.Builder() + .name("public-key-user-id") + .displayName("Public Key User Id") + .description("In a PGP encrypt mode, this user id of the recipient") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PRIVATE_KEYRING = new PropertyDescriptor.Builder() + .name("private-keyring-file") + .displayName("Private Keyring File") + .description("In a PGP decrypt mode, this keyring contains the private key of the recipient") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PRIVATE_KEYRING_PASSPHRASE = new PropertyDescriptor.Builder() + .name("private-keyring-passphrase") + .displayName("Private Keyring Passphrase") + .description("In a PGP decrypt mode, this is the private keyring passphrase") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("Any FlowFile that is successfully encrypted or decrypted will be routed to success").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("Any FlowFile that cannot be encrypted or decrypted will be routed to failure").build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + + static { + // add BouncyCastle encryption providers + Security.addProvider(new BouncyCastleProvider()); + } + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(MODE); + properties.add(ENCRYPTION_ALGORITHM); + properties.add(PASSWORD); + properties.add(PUBLIC_KEYRING); + properties.add(PUBLIC_KEY_USERID); + properties.add(PRIVATE_KEYRING); + properties.add(PRIVATE_KEYRING_PASSPHRASE); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + - public static boolean isPGPAlgorithm(String algorithm) { ++ public static boolean isPGPAlgorithm(final String algorithm) { + return algorithm.startsWith("PGP"); + } + - public static boolean isPGPArmoredAlgorithm(String algorithm) { ++ public static boolean isPGPArmoredAlgorithm(final String algorithm) { + return isPGPAlgorithm(algorithm) && algorithm.endsWith("ASCII-ARMOR"); + } + + @Override - protected Collection<ValidationResult> customValidate(ValidationContext context) { - List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context)); ++ protected Collection<ValidationResult> customValidate(final ValidationContext context) { ++ final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context)); + final String method = context.getProperty(ENCRYPTION_ALGORITHM).getValue(); + final String algorithm = EncryptionMethod.valueOf(method).getAlgorithm(); + final String password = context.getProperty(PASSWORD).getValue(); + if (isPGPAlgorithm(algorithm)) { + if (password == null) { - boolean encrypt = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCRYPT_MODE); ++ final boolean encrypt = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCRYPT_MODE); + if (encrypt) { + // need both public-keyring-file and public-key-user-id set - String publicKeyring = context.getProperty(PUBLIC_KEYRING).getValue(); - String publicUserId = context.getProperty(PUBLIC_KEY_USERID).getValue(); ++ final String publicKeyring = context.getProperty(PUBLIC_KEYRING).getValue(); ++ final String publicUserId = context.getProperty(PUBLIC_KEY_USERID).getValue(); + if (publicKeyring == null || publicUserId == null) { + validationResults.add(new ValidationResult.Builder().subject(PUBLIC_KEYRING.getDisplayName()) + .explanation(algorithm + " encryption without a " + PASSWORD.getDisplayName() + " requires both " + + PUBLIC_KEYRING.getDisplayName() + " and " + PUBLIC_KEY_USERID.getDisplayName()) + .build()); + } else { + // verify the public keyring contains the user id + try { + if (OpenPGPKeyBasedEncryptor.getPublicKey(publicUserId, publicKeyring) == null) { + validationResults.add(new ValidationResult.Builder().subject(PUBLIC_KEYRING.getDisplayName()) + .explanation(PUBLIC_KEYRING.getDisplayName() + " " + publicKeyring + + " does not contain user id " + publicUserId) + .build()); + } - } catch (Exception e) { ++ } catch (final Exception e) { + validationResults.add(new ValidationResult.Builder().subject(PUBLIC_KEYRING.getDisplayName()) + .explanation("Invalid " + PUBLIC_KEYRING.getDisplayName() + " " + publicKeyring + + " because " + e.toString()) + .build()); + } + } + } else { + // need both private-keyring-file and private-keyring-passphrase set - String privateKeyring = context.getProperty(PRIVATE_KEYRING).getValue(); - String keyringPassphrase = context.getProperty(PRIVATE_KEYRING_PASSPHRASE).getValue(); ++ final String privateKeyring = context.getProperty(PRIVATE_KEYRING).getValue(); ++ final String keyringPassphrase = context.getProperty(PRIVATE_KEYRING_PASSPHRASE).getValue(); + if (privateKeyring == null || keyringPassphrase == null) { + validationResults.add(new ValidationResult.Builder().subject(PRIVATE_KEYRING.getName()) + .explanation(algorithm + " decryption without a " + PASSWORD.getDisplayName() + " requires both " + + PRIVATE_KEYRING.getDisplayName() + " and " + PRIVATE_KEYRING_PASSPHRASE.getDisplayName()) + .build()); + } else { + final String providerName = EncryptionMethod.valueOf(method).getProvider(); + // verify the passphrase works on the private keyring + try { + if (!OpenPGPKeyBasedEncryptor.validateKeyring(providerName, privateKeyring, keyringPassphrase.toCharArray())) { + validationResults.add(new ValidationResult.Builder().subject(PRIVATE_KEYRING.getDisplayName()) + .explanation(PRIVATE_KEYRING.getDisplayName() + " " + privateKeyring + + " could not be opened with the provided " + PRIVATE_KEYRING_PASSPHRASE.getDisplayName()) + .build()); + } - } catch (Exception e) { ++ } catch (final Exception e) { + validationResults.add(new ValidationResult.Builder().subject(PRIVATE_KEYRING.getDisplayName()) + .explanation("Invalid " + PRIVATE_KEYRING.getDisplayName() + " " + privateKeyring + + " because " + e.toString()) + .build()); + } + } + } + } - } else { - if (password == null) { - validationResults.add(new ValidationResult.Builder().subject(PASSWORD.getName()) - .explanation(PASSWORD.getDisplayName() + " is required when using algorithm " + algorithm).build()); - } ++ } else if (password == null) { ++ validationResults.add(new ValidationResult.Builder().subject(PASSWORD.getName()) ++ .explanation(PASSWORD.getDisplayName() + " is required when using algorithm " + algorithm).build()); + } + return validationResults; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final String method = context.getProperty(ENCRYPTION_ALGORITHM).getValue(); + final EncryptionMethod encryptionMethod = EncryptionMethod.valueOf(method); + final String providerName = encryptionMethod.getProvider(); + final String algorithm = encryptionMethod.getAlgorithm(); + final String password = context.getProperty(PASSWORD).getValue(); - boolean encrypt = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCRYPT_MODE); ++ final boolean encrypt = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCRYPT_MODE); + + Encryptor encryptor; + StreamCallback callback; + try { + if (isPGPAlgorithm(algorithm)) { - String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); - String publicKeyring = context.getProperty(PUBLIC_KEYRING).getValue(); - String privateKeyring = context.getProperty(PRIVATE_KEYRING).getValue(); ++ final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); ++ final String publicKeyring = context.getProperty(PUBLIC_KEYRING).getValue(); ++ final String privateKeyring = context.getProperty(PRIVATE_KEYRING).getValue(); + if (encrypt && publicKeyring != null) { - String publicUserId = context.getProperty(PUBLIC_KEY_USERID).getValue(); ++ final String publicUserId = context.getProperty(PUBLIC_KEY_USERID).getValue(); + encryptor = new OpenPGPKeyBasedEncryptor(algorithm, providerName, publicKeyring, publicUserId, null, filename); + } else if (!encrypt && privateKeyring != null) { - char[] keyringPassphrase = context.getProperty(PRIVATE_KEYRING_PASSPHRASE).getValue().toCharArray(); ++ final char[] keyringPassphrase = context.getProperty(PRIVATE_KEYRING_PASSPHRASE).getValue().toCharArray(); + encryptor = new OpenPGPKeyBasedEncryptor(algorithm, providerName, privateKeyring, null, keyringPassphrase, + filename); + } else { - char[] passphrase = Normalizer.normalize(password, Normalizer.Form.NFC).toCharArray(); ++ final char[] passphrase = Normalizer.normalize(password, Normalizer.Form.NFC).toCharArray(); + encryptor = new OpenPGPPasswordBasedEncryptor(algorithm, providerName, passphrase, filename); + } + } else { - char[] passphrase = Normalizer.normalize(password, Normalizer.Form.NFC).toCharArray(); ++ final char[] passphrase = Normalizer.normalize(password, Normalizer.Form.NFC).toCharArray(); + encryptor = new PasswordBasedEncryptor(algorithm, providerName, passphrase); + } + + if (encrypt) { + callback = encryptor.getEncryptionCallback(); + } else { + callback = encryptor.getDecryptionCallback(); + } + - } catch (Exception e) { ++ } catch (final Exception e) { + logger.error("Failed to initialize {}cryption algorithm because - ", new Object[] { encrypt ? "en" : "de", e }); + session.rollback(); + context.yield(); + return; + } + + try { + final StopWatch stopWatch = new StopWatch(true); + flowFile = session.write(flowFile, callback); + logger.info("successfully {}crypted {}", new Object[] { encrypt ? "en" : "de", flowFile }); + session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); - } catch (ProcessException e) { ++ } catch (final ProcessException e) { + logger.error("Cannot {}crypt {} - ", new Object[] { encrypt ? "en" : "de", flowFile, e }); + session.transfer(flowFile, REL_FAILURE); + return; + } + } + + public static interface Encryptor { + public StreamCallback getEncryptionCallback() throws Exception; + + public StreamCallback getDecryptionCallback() throws Exception; + } + -} ++} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7db27c01/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java index 59ac975,0ea7261..a5581b3 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java @@@ -1,85 -1,163 +1,163 @@@ - /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package org.apache.nifi.processors.standard; - - import java.io.File; - import java.io.IOException; - import java.nio.file.Paths; - - import org.apache.nifi.security.util.EncryptionMethod; - import org.apache.nifi.util.MockFlowFile; - import org.apache.nifi.util.TestRunner; - import org.apache.nifi.util.TestRunners; - - import org.junit.Test; - - public class TestEncryptContent { - - @Test - public void testRoundTrip() throws IOException { - final TestRunner testRunner = TestRunners.newTestRunner(new EncryptContent()); - testRunner.setProperty(EncryptContent.PASSWORD, "Hello, World!"); - - for (final EncryptionMethod method : EncryptionMethod.values()) { - if (method.isUnlimitedStrength()) { - continue; // cannot test unlimited strength in unit tests because it's not enabled by the JVM by default. - } - testRunner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, method.name()); - testRunner.setProperty(EncryptContent.MODE, EncryptContent.ENCRYPT_MODE); - - testRunner.enqueue(Paths.get("src/test/resources/hello.txt")); - testRunner.clearTransferState(); - testRunner.run(); - - testRunner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1); - - MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0); - testRunner.assertQueueEmpty(); - - testRunner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE); - testRunner.enqueue(flowFile); - testRunner.clearTransferState(); - testRunner.run(); - testRunner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1); - - flowFile = testRunner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0); - flowFile.assertContentEquals(new File("src/test/resources/hello.txt")); - } - } - - @Test - public void testDecryptNonEncryptedFile() throws IOException { - final TestRunner testRunner = TestRunners.newTestRunner(new EncryptContent()); - testRunner.setProperty(EncryptContent.PASSWORD, "Hello, World!"); - - for (final EncryptionMethod method : EncryptionMethod.values()) { - if (method.isUnlimitedStrength()) { - continue; // cannot test unlimited strength in unit tests because it's not enabled by the JVM by default. - } - - testRunner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, method.name()); - testRunner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE); - - testRunner.enqueue(Paths.get("src/test/resources/hello.txt")); - testRunner.clearTransferState(); - testRunner.run(); - - testRunner.assertAllFlowFilesTransferred(EncryptContent.REL_FAILURE, 1); - } - } - - } + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.io.File; + import java.io.IOException; + import java.nio.file.Paths; + import java.util.Collection; + import java.util.HashSet; + + import org.apache.nifi.components.ValidationResult; + import org.apache.nifi.security.util.EncryptionMethod; + import org.apache.nifi.util.MockFlowFile; + import org.apache.nifi.util.MockProcessContext; + import org.apache.nifi.util.TestRunner; + import org.apache.nifi.util.TestRunners; + import org.junit.Assert; + import org.junit.Test; + + public class TestEncryptContent { + + @Test + public void testRoundTrip() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new EncryptContent()); + testRunner.setProperty(EncryptContent.PASSWORD, "Hello, World!"); + + for (final EncryptionMethod method : EncryptionMethod.values()) { + if (method.isUnlimitedStrength()) { + continue; // cannot test unlimited strength in unit tests because it's not enabled by the JVM by default. + } + testRunner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, method.name()); + testRunner.setProperty(EncryptContent.MODE, EncryptContent.ENCRYPT_MODE); + + testRunner.enqueue(Paths.get("src/test/resources/hello.txt")); + testRunner.clearTransferState(); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1); + + MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0); + testRunner.assertQueueEmpty(); + + testRunner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE); + testRunner.enqueue(flowFile); + testRunner.clearTransferState(); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1); + + flowFile = testRunner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0); + flowFile.assertContentEquals(new File("src/test/resources/hello.txt")); + } + } + + @Test + public void testDecryptSmallerThanSaltSize() { - TestRunner runner = TestRunners.newTestRunner(EncryptContent.class); ++ final TestRunner runner = TestRunners.newTestRunner(EncryptContent.class); + runner.setProperty(EncryptContent.PASSWORD, "Hello, World!"); + runner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE); + runner.enqueue(new byte[4]); + runner.run(); + runner.assertAllFlowFilesTransferred(EncryptContent.REL_FAILURE, 1); + } + + @Test + public void testPGPDecrypt() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(EncryptContent.class); + runner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE); + runner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, EncryptionMethod.PGP_ASCII_ARMOR.name()); + runner.setProperty(EncryptContent.PASSWORD, "Hello, World!"); + + runner.enqueue(Paths.get("src/test/resources/TestEncryptContent/text.txt.asc")); + runner.run(); + + runner.assertAllFlowFilesTransferred(EncryptContent.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0); ++ final MockFlowFile flowFile = runner.getFlowFilesForRelationship(EncryptContent.REL_SUCCESS).get(0); + flowFile.assertContentEquals(Paths.get("src/test/resources/TestEncryptContent/text.txt")); + } + + @Test + public void testValidation() { + final TestRunner runner = TestRunners.newTestRunner(EncryptContent.class); + Collection<ValidationResult> results; + MockProcessContext pc; + + results = new HashSet<>(); + runner.enqueue(new byte[0]); + pc = (MockProcessContext) runner.getProcessContext(); + results = pc.validate(); + Assert.assertEquals(1, results.size()); - for (ValidationResult vr : results) { ++ for (final ValidationResult vr : results) { + Assert.assertTrue(vr.toString() + .contains(EncryptContent.PASSWORD.getDisplayName() + " is required when using algorithm")); + } + + results = new HashSet<>(); + runner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, EncryptionMethod.PGP.name()); + runner.setProperty(EncryptContent.PUBLIC_KEYRING, "src/test/resources/TestEncryptContent/text.txt"); + runner.enqueue(new byte[0]); + pc = (MockProcessContext) runner.getProcessContext(); + results = pc.validate(); + Assert.assertEquals(1, results.size()); - for (ValidationResult vr : results) { ++ for (final ValidationResult vr : results) { + Assert.assertTrue(vr.toString().contains( + " encryption without a " + EncryptContent.PASSWORD.getDisplayName() + " requires both " + + EncryptContent.PUBLIC_KEYRING.getDisplayName() + " and " + + EncryptContent.PUBLIC_KEY_USERID.getDisplayName())); + } + + results = new HashSet<>(); + runner.setProperty(EncryptContent.PUBLIC_KEY_USERID, "USERID"); + runner.enqueue(new byte[0]); + pc = (MockProcessContext) runner.getProcessContext(); + results = pc.validate(); + Assert.assertEquals(1, results.size()); - for (ValidationResult vr : results) { ++ for (final ValidationResult vr : results) { + Assert.assertTrue(vr.toString().contains("does not contain user id USERID")); + } + + runner.removeProperty(EncryptContent.PUBLIC_KEYRING); + runner.removeProperty(EncryptContent.PUBLIC_KEY_USERID); + + results = new HashSet<>(); + runner.setProperty(EncryptContent.MODE, EncryptContent.DECRYPT_MODE); + runner.setProperty(EncryptContent.PRIVATE_KEYRING, "src/test/resources/TestEncryptContent/text.txt"); + runner.enqueue(new byte[0]); + pc = (MockProcessContext) runner.getProcessContext(); + results = pc.validate(); + Assert.assertEquals(1, results.size()); - for (ValidationResult vr : results) { ++ for (final ValidationResult vr : results) { + Assert.assertTrue(vr.toString().contains( + " decryption without a " + EncryptContent.PASSWORD.getDisplayName() + " requires both " + + EncryptContent.PRIVATE_KEYRING.getDisplayName() + " and " + + EncryptContent.PRIVATE_KEYRING_PASSPHRASE.getDisplayName())); + + } + + results = new HashSet<>(); + runner.setProperty(EncryptContent.PRIVATE_KEYRING_PASSPHRASE, "PASSWORD"); + runner.enqueue(new byte[0]); + pc = (MockProcessContext) runner.getProcessContext(); + results = pc.validate(); + Assert.assertEquals(1, results.size()); - for (ValidationResult vr : results) { ++ for (final ValidationResult vr : results) { + Assert.assertTrue(vr.toString().contains( + " could not be opened with the provided " + EncryptContent.PRIVATE_KEYRING_PASSPHRASE.getDisplayName())); + + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7db27c01/nifi/pom.xml ----------------------------------------------------------------------
