http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index be0fc67..e989208 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -81,6 +81,10 @@ <artifactId>bcprov-jdk16</artifactId> </dependency> <dependency> + <groupId>org.bouncycastle</groupId> + <artifactId>bcpg-jdk16</artifactId> + </dependency> + <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> </dependency> @@ -187,6 +191,8 @@ <exclude>src/test/resources/ScanAttribute/dictionary-with-empty-new-lines</exclude> <exclude>src/test/resources/ScanAttribute/dictionary-with-extra-info</exclude> <exclude>src/test/resources/ScanAttribute/dictionary1</exclude> + <exclude>src/test/resources/TestEncryptContent/text.txt</exclude> + <exclude>src/test/resources/TestEncryptContent/text.txt.asc</exclude> <exclude>src/test/resources/TestIdentifyMimeType/1.txt</exclude> <exclude>src/test/resources/TestJson/json-sample.json</exclude> <exclude>src/test/resources/TestMergeContent/demarcate</exclude>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java new file mode 100644 index 0000000..7400821 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +@EventDriven +@SupportsBatching +@Tags({"test", "load", "duplicate"}) +@CapabilityDescription("Intended for load testing, this processor will create the configured number of copies of each incoming FlowFile") +public class DuplicateFlowFile extends AbstractProcessor { + + static final PropertyDescriptor NUM_COPIES = new PropertyDescriptor.Builder() + .name("Number of Copies") + .description("Specifies how many copies of each incoming FlowFile will be made") + .required(true) + .expressionLanguageSupported(false) + .defaultValue("100") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The original FlowFile and all copies will be sent to this relationship") + .build(); + + @Override + public Set<Relationship> getRelationships() { + return Collections.singleton(REL_SUCCESS); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return Collections.singletonList(NUM_COPIES); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + for (int i=0; i < context.getProperty(NUM_COPIES).asInteger(); i++) { + final FlowFile copy = session.clone(flowFile); + session.transfer(copy, REL_SUCCESS); + } + + session.transfer(flowFile, REL_SUCCESS); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java index c0f6301..6492d0a 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java @@ -1,276 +1,304 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard; - -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.logging.ProcessorLog; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.behavior.EventDriven; -import org.apache.nifi.annotation.behavior.SideEffectFree; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.StreamCallback; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.security.util.EncryptionMethod; -import org.apache.nifi.util.StopWatch; - -import org.bouncycastle.jce.provider.BouncyCastleProvider; - -import javax.crypto.spec.PBEKeySpec; -import javax.crypto.spec.PBEParameterSpec; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.security.InvalidAlgorithmParameterException; -import java.security.InvalidKeyException; -import java.security.SecureRandom; -import java.security.Security; -import java.text.Normalizer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import javax.crypto.BadPaddingException; -import javax.crypto.Cipher; -import javax.crypto.IllegalBlockSizeException; -import javax.crypto.SecretKey; -import javax.crypto.SecretKeyFactory; - -@EventDriven -@SideEffectFree -@SupportsBatching -@Tags({"encryption", "decryption", "password", "JCE"}) -@CapabilityDescription("Encrypts or Decrypts a FlowFile using a randomly generated salt") -public class EncryptContent extends AbstractProcessor { - - public static final String ENCRYPT_MODE = "Encrypt"; - public static final String DECRYPT_MODE = "Decrypt"; - public static final String SECURE_RANDOM_ALGORITHM = "SHA1PRNG"; - public static final int DEFAULT_SALT_SIZE = 8; - - public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() - .name("Mode") - .description("Specifies whether the content should be encrypted or decrypted") - .required(true) - .allowableValues(ENCRYPT_MODE, DECRYPT_MODE) - .defaultValue(ENCRYPT_MODE) - .build(); - public static final PropertyDescriptor ENCRYPTION_ALGORITHM = new PropertyDescriptor.Builder() - .name("Encryption Algorithm") - .description("The Encryption Algorithm to use") - .required(true) - .allowableValues(EncryptionMethod.values()) - .defaultValue(EncryptionMethod.MD5_256AES.name()) - .build(); - public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() - .name("Password") - .description("The Password to use for encrypting or decrypting the data") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .sensitive(true) - .build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Any FlowFile that is successfully encrypted or decrypted will be routed to success") - .build(); - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("Any FlowFile that cannot be encrypted or decrypted will be routed to failure") - .build(); - - private List<PropertyDescriptor> properties; - private Set<Relationship> relationships; - - static { - Security.addProvider(new BouncyCastleProvider()); - } - - @Override - protected void init(final ProcessorInitializationContext context) { - final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(MODE); - properties.add(ENCRYPTION_ALGORITHM); - properties.add(PASSWORD); - this.properties = Collections.unmodifiableList(properties); - - final Set<Relationship> relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - this.relationships = Collections.unmodifiableSet(relationships); - } - - @Override - public Set<Relationship> getRelationships() { - return relationships; - } - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return properties; - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - final ProcessorLog logger = getLogger(); - final String method = context.getProperty(ENCRYPTION_ALGORITHM).getValue(); - final EncryptionMethod encryptionMethod = EncryptionMethod.valueOf(method); - final String providerName = encryptionMethod.getProvider(); - final String algorithm = encryptionMethod.getAlgorithm(); - - final String password = context.getProperty(PASSWORD).getValue(); - final char[] normalizedPassword = Normalizer.normalize(password, Normalizer.Form.NFC).toCharArray(); - final PBEKeySpec pbeKeySpec = new PBEKeySpec(normalizedPassword); - - final SecureRandom secureRandom; - final SecretKeyFactory factory; - final SecretKey secretKey; - final Cipher cipher; - try { - secureRandom = SecureRandom.getInstance(SECURE_RANDOM_ALGORITHM); - secureRandom.setSeed(System.currentTimeMillis()); - factory = SecretKeyFactory.getInstance(algorithm, providerName); - secretKey = factory.generateSecret(pbeKeySpec); - cipher = Cipher.getInstance(algorithm, providerName); - } catch (final Exception e) { - logger.error("failed to initialize Encryption/Decryption algorithm due to {}", new Object[]{e}); - session.transfer(flowFile, REL_FAILURE); - return; - } - - final int algorithmBlockSize = cipher.getBlockSize(); - final int saltSize = (algorithmBlockSize > 0) ? algorithmBlockSize : DEFAULT_SALT_SIZE; - - final StopWatch stopWatch = new StopWatch(true); - if (context.getProperty(MODE).getValue().equalsIgnoreCase(ENCRYPT_MODE)) { - final byte[] salt = new byte[saltSize]; - secureRandom.nextBytes(salt); - - final PBEParameterSpec parameterSpec = new PBEParameterSpec(salt, 1000); - try { - cipher.init(Cipher.ENCRYPT_MODE, secretKey, parameterSpec); - } catch (final InvalidKeyException | InvalidAlgorithmParameterException e) { - logger.error("unable to encrypt {} due to {}", new Object[]{flowFile, e}); - session.transfer(flowFile, REL_FAILURE); - return; - } - - flowFile = session.write(flowFile, new EncryptCallback(cipher, salt)); - logger.info("Successfully encrypted {}", new Object[]{flowFile}); - } else { - if (flowFile.getSize() <= saltSize) { - logger.error("Cannot decrypt {} because its file size is not greater than the salt size", new Object[]{flowFile}); - session.transfer(flowFile, REL_FAILURE); - return; - } - - flowFile = session.write(flowFile, new DecryptCallback(cipher, secretKey, saltSize)); - logger.info("successfully decrypted {}", new Object[]{flowFile}); - } - - session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - session.transfer(flowFile, REL_SUCCESS); - } - - private static class DecryptCallback implements StreamCallback { - - private final Cipher cipher; - private final SecretKey secretKey; - private final int saltSize; - - public DecryptCallback(final Cipher cipher, final SecretKey secretKey, final int saltSize) { - this.cipher = cipher; - this.secretKey = secretKey; - this.saltSize = saltSize; - } - - @Override - public void process(final InputStream in, final OutputStream out) throws IOException { - final byte[] salt = new byte[saltSize]; - StreamUtils.fillBuffer(in, salt); - - final PBEParameterSpec parameterSpec = new PBEParameterSpec(salt, 1000); - try { - cipher.init(Cipher.DECRYPT_MODE, secretKey, parameterSpec); - } catch (final Exception e) { - throw new ProcessException(e); - } - - final byte[] buffer = new byte[65536]; - int len; - while ((len = in.read(buffer)) > 0) { - final byte[] decryptedBytes = cipher.update(buffer, 0, len); - if (decryptedBytes != null) { - out.write(decryptedBytes); - } - } - - try { - out.write(cipher.doFinal()); - } catch (final Exception e) { - throw new ProcessException(e); - } - } - } - - private static class EncryptCallback implements StreamCallback { - - private final Cipher cipher; - private final byte[] salt; - - public EncryptCallback(final Cipher cipher, final byte[] salt) { - this.cipher = cipher; - this.salt = salt; - } - - @Override - public void process(final InputStream in, final OutputStream out) throws IOException { - out.write(salt); - - final byte[] buffer = new byte[65536]; - int len; - while ((len = in.read(buffer)) > 0) { - final byte[] encryptedBytes = cipher.update(buffer, 0, len); - if (encryptedBytes != null) { - out.write(encryptedBytes); - } - } - - try { - out.write(cipher.doFinal()); - } catch (final IllegalBlockSizeException | BadPaddingException e) { - throw new ProcessException(e); - } - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.security.Security; +import java.text.Normalizer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.OpenPGPKeyBasedEncryptor; +import org.apache.nifi.processors.standard.util.OpenPGPPasswordBasedEncryptor; +import org.apache.nifi.processors.standard.util.PasswordBasedEncryptor; +import org.apache.nifi.security.util.EncryptionMethod; +import org.apache.nifi.util.StopWatch; +import org.bouncycastle.jce.provider.BouncyCastleProvider; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"encryption", "decryption", "password", "JCE", "OpenPGP", "PGP", "GPG"}) +@CapabilityDescription("Encrypts or Decrypts a FlowFile using either symmetric encryption with a password and randomly generated salt, or asymmetric encryption using a public and secret key.") +public class EncryptContent extends AbstractProcessor { + + public static final String ENCRYPT_MODE = "Encrypt"; + public static final String DECRYPT_MODE = "Decrypt"; + + public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() + .name("Mode") + .description("Specifies whether the content should be encrypted or decrypted") + .required(true) + .allowableValues(ENCRYPT_MODE, DECRYPT_MODE) + .defaultValue(ENCRYPT_MODE) + .build(); + public static final PropertyDescriptor ENCRYPTION_ALGORITHM = new PropertyDescriptor.Builder() + .name("Encryption Algorithm") + .description("The Encryption Algorithm to use") + .required(true) + .allowableValues(EncryptionMethod.values()) + .defaultValue(EncryptionMethod.MD5_256AES.name()) + .build(); + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("Password") + .description("The Password to use for encrypting or decrypting the data") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + public static final PropertyDescriptor PUBLIC_KEYRING = new PropertyDescriptor.Builder() + .name("public-keyring-file") + .displayName("Public Keyring File") + .description("In a PGP encrypt mode, this keyring contains the public key of the recipient") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PUBLIC_KEY_USERID = new PropertyDescriptor.Builder() + .name("public-key-user-id") + .displayName("Public Key User Id") + .description("In a PGP encrypt mode, this user id of the recipient") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PRIVATE_KEYRING = new PropertyDescriptor.Builder() + .name("private-keyring-file") + .displayName("Private Keyring File") + .description("In a PGP decrypt mode, this keyring contains the private key of the recipient") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PRIVATE_KEYRING_PASSPHRASE = new PropertyDescriptor.Builder() + .name("private-keyring-passphrase") + .displayName("Private Keyring Passphrase") + .description("In a PGP decrypt mode, this is the private keyring passphrase") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("Any FlowFile that is successfully encrypted or decrypted will be routed to success").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("Any FlowFile that cannot be encrypted or decrypted will be routed to failure").build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + + static { + // add BouncyCastle encryption providers + Security.addProvider(new BouncyCastleProvider()); + } + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(MODE); + properties.add(ENCRYPTION_ALGORITHM); + properties.add(PASSWORD); + properties.add(PUBLIC_KEYRING); + properties.add(PUBLIC_KEY_USERID); + properties.add(PRIVATE_KEYRING); + properties.add(PRIVATE_KEYRING_PASSPHRASE); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + public static boolean isPGPAlgorithm(final String algorithm) { + return algorithm.startsWith("PGP"); + } + + public static boolean isPGPArmoredAlgorithm(final String algorithm) { + return isPGPAlgorithm(algorithm) && algorithm.endsWith("ASCII-ARMOR"); + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext context) { + final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context)); + final String method = context.getProperty(ENCRYPTION_ALGORITHM).getValue(); + final String algorithm = EncryptionMethod.valueOf(method).getAlgorithm(); + final String password = context.getProperty(PASSWORD).getValue(); + if (isPGPAlgorithm(algorithm)) { + if (password == null) { + final boolean encrypt = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCRYPT_MODE); + if (encrypt) { + // need both public-keyring-file and public-key-user-id set + final String publicKeyring = context.getProperty(PUBLIC_KEYRING).getValue(); + final String publicUserId = context.getProperty(PUBLIC_KEY_USERID).getValue(); + if (publicKeyring == null || publicUserId == null) { + validationResults.add(new ValidationResult.Builder().subject(PUBLIC_KEYRING.getDisplayName()) + .explanation(algorithm + " encryption without a " + PASSWORD.getDisplayName() + " requires both " + + PUBLIC_KEYRING.getDisplayName() + " and " + PUBLIC_KEY_USERID.getDisplayName()) + .build()); + } else { + // verify the public keyring contains the user id + try { + if (OpenPGPKeyBasedEncryptor.getPublicKey(publicUserId, publicKeyring) == null) { + validationResults.add(new ValidationResult.Builder().subject(PUBLIC_KEYRING.getDisplayName()) + .explanation(PUBLIC_KEYRING.getDisplayName() + " " + publicKeyring + + " does not contain user id " + publicUserId) + .build()); + } + } catch (final Exception e) { + validationResults.add(new ValidationResult.Builder().subject(PUBLIC_KEYRING.getDisplayName()) + .explanation("Invalid " + PUBLIC_KEYRING.getDisplayName() + " " + publicKeyring + + " because " + e.toString()) + .build()); + } + } + } else { + // need both private-keyring-file and private-keyring-passphrase set + final String privateKeyring = context.getProperty(PRIVATE_KEYRING).getValue(); + final String keyringPassphrase = context.getProperty(PRIVATE_KEYRING_PASSPHRASE).getValue(); + if (privateKeyring == null || keyringPassphrase == null) { + validationResults.add(new ValidationResult.Builder().subject(PRIVATE_KEYRING.getName()) + .explanation(algorithm + " decryption without a " + PASSWORD.getDisplayName() + " requires both " + + PRIVATE_KEYRING.getDisplayName() + " and " + PRIVATE_KEYRING_PASSPHRASE.getDisplayName()) + .build()); + } else { + final String providerName = EncryptionMethod.valueOf(method).getProvider(); + // verify the passphrase works on the private keyring + try { + if (!OpenPGPKeyBasedEncryptor.validateKeyring(providerName, privateKeyring, keyringPassphrase.toCharArray())) { + validationResults.add(new ValidationResult.Builder().subject(PRIVATE_KEYRING.getDisplayName()) + .explanation(PRIVATE_KEYRING.getDisplayName() + " " + privateKeyring + + " could not be opened with the provided " + PRIVATE_KEYRING_PASSPHRASE.getDisplayName()) + .build()); + } + } catch (final Exception e) { + validationResults.add(new ValidationResult.Builder().subject(PRIVATE_KEYRING.getDisplayName()) + .explanation("Invalid " + PRIVATE_KEYRING.getDisplayName() + " " + privateKeyring + + " because " + e.toString()) + .build()); + } + } + } + } + } else if (password == null) { + validationResults.add(new ValidationResult.Builder().subject(PASSWORD.getName()) + .explanation(PASSWORD.getDisplayName() + " is required when using algorithm " + algorithm).build()); + } + return validationResults; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final String method = context.getProperty(ENCRYPTION_ALGORITHM).getValue(); + final EncryptionMethod encryptionMethod = EncryptionMethod.valueOf(method); + final String providerName = encryptionMethod.getProvider(); + final String algorithm = encryptionMethod.getAlgorithm(); + final String password = context.getProperty(PASSWORD).getValue(); + final boolean encrypt = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCRYPT_MODE); + + Encryptor encryptor; + StreamCallback callback; + try { + if (isPGPAlgorithm(algorithm)) { + final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final String publicKeyring = context.getProperty(PUBLIC_KEYRING).getValue(); + final String privateKeyring = context.getProperty(PRIVATE_KEYRING).getValue(); + if (encrypt && publicKeyring != null) { + final String publicUserId = context.getProperty(PUBLIC_KEY_USERID).getValue(); + encryptor = new OpenPGPKeyBasedEncryptor(algorithm, providerName, publicKeyring, publicUserId, null, filename); + } else if (!encrypt && privateKeyring != null) { + final char[] keyringPassphrase = context.getProperty(PRIVATE_KEYRING_PASSPHRASE).getValue().toCharArray(); + encryptor = new OpenPGPKeyBasedEncryptor(algorithm, providerName, privateKeyring, null, keyringPassphrase, + filename); + } else { + final char[] passphrase = Normalizer.normalize(password, Normalizer.Form.NFC).toCharArray(); + encryptor = new OpenPGPPasswordBasedEncryptor(algorithm, providerName, passphrase, filename); + } + } else { + final char[] passphrase = Normalizer.normalize(password, Normalizer.Form.NFC).toCharArray(); + encryptor = new PasswordBasedEncryptor(algorithm, providerName, passphrase); + } + + if (encrypt) { + callback = encryptor.getEncryptionCallback(); + } else { + callback = encryptor.getDecryptionCallback(); + } + + } catch (final Exception e) { + logger.error("Failed to initialize {}cryption algorithm because - ", new Object[] { encrypt ? "en" : "de", e }); + session.rollback(); + context.yield(); + return; + } + + try { + final StopWatch stopWatch = new StopWatch(true); + flowFile = session.write(flowFile, callback); + logger.info("successfully {}crypted {}", new Object[] { encrypt ? "en" : "de", flowFile }); + session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); + } catch (final ProcessException e) { + logger.error("Cannot {}crypt {} - ", new Object[] { encrypt ? "en" : "de", flowFile, e }); + session.transfer(flowFile, REL_FAILURE); + return; + } + } + + public static interface Encryptor { + public StreamCallback getEncryptionCallback() throws Exception; + + public StreamCallback getDecryptionCallback() throws Exception; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java index 424094c..f6085e7 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java @@ -66,43 +66,43 @@ import org.apache.nifi.processor.util.StandardValidators; public class ExecuteProcess extends AbstractProcessor { public static final PropertyDescriptor COMMAND = new PropertyDescriptor.Builder() - .name("Command") - .description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.") - .required(true) - .expressionLanguageSupported(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("Command") + .description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.") + .required(true) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final PropertyDescriptor COMMAND_ARGUMENTS = new PropertyDescriptor.Builder() - .name("Command Arguments") - .description("The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.") - .required(false) - .expressionLanguageSupported(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("Command Arguments") + .description("The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.") + .required(false) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final PropertyDescriptor WORKING_DIR = new PropertyDescriptor.Builder() - .name("Working Directory") - .description("The directory to use as the current working directory when executing the command") - .expressionLanguageSupported(false) - .addValidator(StandardValidators.createDirectoryExistsValidator(false, true)) - .required(false) - .build(); + .name("Working Directory") + .description("The directory to use as the current working directory when executing the command") + .expressionLanguageSupported(false) + .addValidator(StandardValidators.createDirectoryExistsValidator(false, true)) + .required(false) + .build(); public static final PropertyDescriptor BATCH_DURATION = new PropertyDescriptor.Builder() - .name("Batch Duration") - .description("If the process is expected to be long-running and produce textual output, a batch duration can be specified so " - + "that the output will be captured for this amount of time and a FlowFile will then be sent out with the results " - + "and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results") + .name("Batch Duration") + .description("If the process is expected to be long-running and produce textual output, a batch duration can be specified so " + + "that the output will be captured for this amount of time and a FlowFile will then be sent out with the results " + + "and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results") .required(false) .expressionLanguageSupported(false) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); public static final PropertyDescriptor REDIRECT_ERROR_STREAM = new PropertyDescriptor.Builder() - .name("Redirect Error Stream") - .description("If true will redirect any error stream output of the process to the output stream. " - + "This is particularly helpful for processes which write extensively to the error stream or for troubleshooting.") + .name("Redirect Error Stream") + .description("If true will redirect any error stream output of the process to the output stream. " + + "This is particularly helpful for processes which write extensively to the error stream or for troubleshooting.") .required(false) .allowableValues("true", "false") .defaultValue("false") @@ -111,11 +111,14 @@ public class ExecuteProcess extends AbstractProcessor { .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("All created FlowFiles are routed to this relationship") - .build(); + .name("success") + .description("All created FlowFiles are routed to this relationship") + .build(); private volatile ExecutorService executor; + private Future<?> longRunningProcess; + private AtomicBoolean failure = new AtomicBoolean(false); + private volatile ProxyOutputStream proxyOut; @Override public Set<Relationship> getRelationships() { @@ -135,11 +138,11 @@ public class ExecuteProcess extends AbstractProcessor { @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment") - .dynamic(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name(propertyDescriptorName) + .description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment") + .dynamic(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); } static List<String> splitArgs(final String input) { @@ -209,15 +212,99 @@ public class ExecuteProcess extends AbstractProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + if (proxyOut==null) { + proxyOut = new ProxyOutputStream(getLogger()); + } + + final Long batchNanos = context.getProperty(BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS); + + final List<String> commandStrings = createCommandStrings(context); + final String commandString = StringUtils.join(commandStrings, " "); + + if (longRunningProcess == null || longRunningProcess.isDone()) { + try { + longRunningProcess = launchProcess(context, commandStrings, batchNanos, proxyOut); + } catch (final IOException ioe) { + getLogger().error("Failed to create process due to {}", new Object[] { ioe }); + context.yield(); + return; + } + } else { + getLogger().info("Read from long running process"); + } + + if (!isScheduled()) { + getLogger().info("User stopped processor; will terminate process immediately"); + longRunningProcess.cancel(true); + return; + } + + // Create a FlowFile that we can write to and set the OutputStream for the FlowFile + // as the delegate for the ProxyOuptutStream, then wait until the process finishes + // or until the specified amount of time + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream flowFileOut) throws IOException { + try (final OutputStream out = new BufferedOutputStream(flowFileOut)) { + proxyOut.setDelegate(out); + + if (batchNanos == null) { + // we are not creating batches; wait until process terminates. + // NB!!! Maybe get(long timeout, TimeUnit unit) should + // be used to avoid waiting forever. + try { + longRunningProcess.get(); + } catch (final InterruptedException ie) { + } catch (final ExecutionException ee) { + getLogger().error("Process execution failed due to {}", new Object[] { ee.getCause() }); + } + } else { + // wait the allotted amount of time. + try { + TimeUnit.NANOSECONDS.sleep(batchNanos); + } catch (final InterruptedException ie) { + } + } + + proxyOut.setDelegate(null); // prevent from writing to this + // stream + } + } + }); + + if (flowFile.getSize() == 0L) { + // If no data was written to the file, remove it + session.remove(flowFile); + } else if (failure.get()) { + // If there was a failure processing the output of the Process, remove the FlowFile + session.remove(flowFile); + getLogger().error("Failed to read data from Process, so will not generate FlowFile"); + } else { + // All was good. Generate event and transfer FlowFile. + session.getProvenanceReporter().create(flowFile, "Created from command: " + commandString); + getLogger().info("Created {} and routed to success", new Object[] { flowFile }); + session.transfer(flowFile, REL_SUCCESS); + } + + // Commit the session so that the FlowFile is transferred to the next processor + session.commit(); + } + + protected List<String> createCommandStrings(final ProcessContext context) { final String command = context.getProperty(COMMAND).getValue(); final List<String> args = splitArgs(context.getProperty(COMMAND_ARGUMENTS).getValue()); - final Boolean redirectErrorStream = context.getProperty(REDIRECT_ERROR_STREAM).asBoolean(); final List<String> commandStrings = new ArrayList<>(args.size() + 1); commandStrings.add(command); commandStrings.addAll(args); + return commandStrings; + } - final String commandString = StringUtils.join(commandStrings, " "); + protected Future<?> launchProcess(final ProcessContext context, final List<String> commandStrings, final Long batchNanos, + final ProxyOutputStream proxyOut) throws IOException { + + final Boolean redirectErrorStream = context.getProperty(REDIRECT_ERROR_STREAM).asBoolean(); final ProcessBuilder builder = new ProcessBuilder(commandStrings); final String workingDirName = context.getProperty(WORKING_DIR).getValue(); @@ -236,24 +323,15 @@ public class ExecuteProcess extends AbstractProcessor { builder.environment().putAll(environment); } - final long startNanos = System.nanoTime(); - final Process process; - try { - process = builder.redirectErrorStream(redirectErrorStream).start(); - } catch (final IOException ioe) { - getLogger().error("Failed to create process due to {}", new Object[]{ioe}); - context.yield(); - return; - } - - final Long batchNanos = context.getProperty(BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS); + getLogger().info("Start creating new Process > {} ", new Object[] { commandStrings }); + final Process newProcess = builder.redirectErrorStream(redirectErrorStream).start(); // Submit task to read error stream from process if (!redirectErrorStream) { executor.submit(new Runnable() { @Override public void run() { - try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) { + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(newProcess.getErrorStream()))) { while (reader.read() >= 0) { } } catch (final IOException ioe) { @@ -263,19 +341,25 @@ public class ExecuteProcess extends AbstractProcessor { } // Submit task to read output of Process and write to FlowFile. - final ProxyOutputStream proxyOut = new ProxyOutputStream(getLogger()); - final AtomicBoolean failure = new AtomicBoolean(false); - final AtomicBoolean finishedCopying = new AtomicBoolean(false); + failure = new AtomicBoolean(false); final Future<?> future = executor.submit(new Callable<Object>() { @Override public Object call() throws IOException { try { if (batchNanos == null) { - // if we aren't batching, just copy the stream from the process to the flowfile. - try (final BufferedInputStream bufferedIn = new BufferedInputStream(process.getInputStream())) { + // if we aren't batching, just copy the stream from the + // process to the flowfile. + try (final BufferedInputStream bufferedIn = new BufferedInputStream(newProcess.getInputStream())) { final byte[] buffer = new byte[4096]; int len; while ((len = bufferedIn.read(buffer)) > 0) { + + // NB!!!! Maybe all data should be read from + // input stream in case of !isScheduled() to + // avoid subprocess deadlock? + // (we just don't write data to proxyOut) + // Or because we don't use this subprocess + // anymore anyway, we don't care? if (!isScheduled()) { return null; } @@ -284,12 +368,15 @@ public class ExecuteProcess extends AbstractProcessor { } } } else { - // we are batching, which means that the output of the process is text. It doesn't make sense to grab - // arbitrary batches of bytes from some process and send it along as a piece of data, so we assume that + // we are batching, which means that the output of the + // process is text. It doesn't make sense to grab + // arbitrary batches of bytes from some process and send + // it along as a piece of data, so we assume that // setting a batch during means text. - // Also, we don't want that text to get split up in the middle of a line, so we use BufferedReader + // Also, we don't want that text to get split up in the + // middle of a line, so we use BufferedReader // to read lines of text and write them as lines of text. - try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(newProcess.getInputStream()))) { String line; while ((line = reader.readLine()) != null) { @@ -305,108 +392,22 @@ public class ExecuteProcess extends AbstractProcessor { failure.set(true); throw ioe; } finally { - finishedCopying.set(true); + int exitCode; + try { + exitCode = newProcess.exitValue(); + } catch (final Exception e) { + exitCode = -99999; + } + getLogger().info("Process finished with exit code {} ", new Object[] { exitCode }); } return null; } }); - // continue to do this loop until both the process has finished and we have finished copying - // the output from the process to the FlowFile. Unfortunately, even after calling Process.exitValue(), - // there can be data buffered on the InputStream; so we will wait until the stream is empty as well. - int flowFileCount = 0; - while (!finishedCopying.get() || isAlive(process)) { - if (!isScheduled()) { - getLogger().info("User stopped processor; will terminate process immediately"); - process.destroy(); - break; - } - - // Create a FlowFile that we can write to and set the OutputStream for the FlowFile - // as the delegate for the ProxyOuptutStream, then wait until the process finishes - // or until the specified amount of time - FlowFile flowFile = session.create(); - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream flowFileOut) throws IOException { - try (final OutputStream out = new BufferedOutputStream(flowFileOut)) { - proxyOut.setDelegate(out); - - if (batchNanos == null) { - // we are not creating batches; wait until process terminates. - Integer exitCode = null; - while (exitCode == null) { - try { - exitCode = process.waitFor(); - } catch (final InterruptedException ie) { - } - } - } else { - // wait the allotted amount of time. - try { - TimeUnit.NANOSECONDS.sleep(batchNanos); - } catch (final InterruptedException ie) { - } - } - - proxyOut.setDelegate(null); // prevent from writing to this stream - } - } - }); - - if (flowFile.getSize() == 0L) { - // If no data was written to the file, remove it - session.remove(flowFile); - } else if (failure.get()) { - // If there was a failure processing the output of the Process, remove the FlowFile - session.remove(flowFile); - getLogger().error("Failed to read data from Process, so will not generate FlowFile"); - break; - } else { - // All was good. Generate event and transfer FlowFile. - session.getProvenanceReporter().create(flowFile, "Created from command: " + commandString); - getLogger().info("Created {} and routed to success", new Object[]{flowFile}); - session.transfer(flowFile, REL_SUCCESS); - flowFileCount++; - } - - // Commit the session so that the FlowFile is transferred to the next processor - session.commit(); - } - - final int exitCode; - final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); - try { - exitCode = process.waitFor(); - } catch (final InterruptedException ie) { - getLogger().warn("Process was interrupted before finishing"); - return; - } - - try { - future.get(); - } catch (final ExecutionException e) { - getLogger().error("Failed to copy output from Process to FlowFile due to {}", new Object[]{e.getCause()}); - } catch (final InterruptedException ie) { - getLogger().error("Interrupted while waiting to copy data form Process to FlowFile"); - return; - } - - getLogger().info("Process finished with exit code {} after creating {} FlowFiles in {} millis", new Object[]{exitCode, flowFileCount, millis}); + return future; } - private boolean isAlive(final Process process) { - // unfortunately, java provides no straight-forward way to test if a Process is alive. - // In Java 8, Process.isAlive() is introduced, but NiFi needs to run against Java 7, - // so we have this solution in the mean time. - try { - process.exitValue(); - return false; - } catch (final IllegalThreadStateException itse) { - return true; - } - } /** * Output stream that is used to wrap another output stream in a way that the underlying output stream can be swapped out for a different one when needed http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java index 11e75ed..1654a4f 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java @@ -185,7 +185,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { static final String LAST_MODIFIED = "LastModified"; static { - SimpleDateFormat sdf = new SimpleDateFormat(LAST_MODIFIED_DATE_PATTERN_RFC1123, Locale.US); + final SimpleDateFormat sdf = new SimpleDateFormat(LAST_MODIFIED_DATE_PATTERN_RFC1123, Locale.US); sdf.setTimeZone(TimeZone.getTimeZone("GMT")); UNINITIALIZED_LAST_MODIFIED_VALUE = sdf.format(new Date(1L)); } @@ -221,13 +221,13 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { this.properties = Collections.unmodifiableList(properties); // load etag and lastModified from file - File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier()); + final File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier()); try (FileInputStream fis = new FileInputStream(httpCache)) { - Properties props = new Properties(); + final Properties props = new Properties(); props.load(fis); entityTagRef.set(props.getProperty(ETAG)); lastModifiedRef.set(props.getProperty(LAST_MODIFIED)); - } catch (IOException swallow) { + } catch (final IOException swallow) { } } @@ -242,20 +242,20 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { } @Override - public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { entityTagRef.set(""); lastModifiedRef.set(UNINITIALIZED_LAST_MODIFIED_VALUE); } @OnShutdown public void onShutdown() { - File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier()); + final File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier()); try (FileOutputStream fos = new FileOutputStream(httpCache)) { - Properties props = new Properties(); + final Properties props = new Properties(); props.setProperty(ETAG, entityTagRef.get()); props.setProperty(LAST_MODIFIED, lastModifiedRef.get()); props.store(fos, "GetHTTP file modification values"); - } catch (IOException swallow) { + } catch (final IOException swallow) { } } @@ -287,7 +287,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { keystore.load(in, service.getKeyStorePassword().toCharArray()); } - SSLContext sslContext = SSLContexts.custom().loadTrustMaterial(truststore, new TrustSelfSignedStrategy()).loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray()).build(); + final SSLContext sslContext = SSLContexts.custom().loadTrustMaterial(truststore, new TrustSelfSignedStrategy()).loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray()).build(); return sslContext; } @@ -310,7 +310,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { try { uri = new URI(url); source = uri.getHost(); - } catch (URISyntaxException swallow) { + } catch (final URISyntaxException swallow) { // this won't happen as the url has already been validated } @@ -434,22 +434,22 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { if (timeToPersist < System.currentTimeMillis()) { readLock.unlock(); writeLock.lock(); - if (timeToPersist < System.currentTimeMillis()) { - try { + try { + if (timeToPersist < System.currentTimeMillis()) { timeToPersist = System.currentTimeMillis() + PERSISTENCE_INTERVAL_MSEC; - File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier()); + final File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier()); try (FileOutputStream fos = new FileOutputStream(httpCache)) { - Properties props = new Properties(); + final Properties props = new Properties(); props.setProperty(ETAG, entityTagRef.get()); props.setProperty(LAST_MODIFIED, lastModifiedRef.get()); props.store(fos, "GetHTTP file modification values"); - } catch (IOException e) { + } catch (final IOException e) { getLogger().error("Failed to persist ETag and LastMod due to " + e, e); } - } finally { - readLock.lock(); - writeLock.unlock(); } + } finally { + readLock.lock(); + writeLock.unlock(); } } } finally { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptor.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptor.java new file mode 100644 index 0000000..60e3bf8 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPKeyBasedEncryptor.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.NoSuchProviderException; +import java.security.SecureRandom; +import java.util.Date; +import java.util.Iterator; +import java.util.zip.Deflater; + +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processors.standard.EncryptContent; +import org.apache.nifi.processors.standard.EncryptContent.Encryptor; +import org.bouncycastle.bcpg.ArmoredOutputStream; +import org.bouncycastle.openpgp.PGPCompressedData; +import org.bouncycastle.openpgp.PGPCompressedDataGenerator; +import org.bouncycastle.openpgp.PGPEncryptedData; +import org.bouncycastle.openpgp.PGPEncryptedDataGenerator; +import org.bouncycastle.openpgp.PGPEncryptedDataList; +import org.bouncycastle.openpgp.PGPException; +import org.bouncycastle.openpgp.PGPLiteralData; +import org.bouncycastle.openpgp.PGPLiteralDataGenerator; +import org.bouncycastle.openpgp.PGPObjectFactory; +import org.bouncycastle.openpgp.PGPPrivateKey; +import org.bouncycastle.openpgp.PGPPublicKey; +import org.bouncycastle.openpgp.PGPPublicKeyEncryptedData; +import org.bouncycastle.openpgp.PGPPublicKeyRing; +import org.bouncycastle.openpgp.PGPPublicKeyRingCollection; +import org.bouncycastle.openpgp.PGPSecretKey; +import org.bouncycastle.openpgp.PGPSecretKeyRing; +import org.bouncycastle.openpgp.PGPSecretKeyRingCollection; +import org.bouncycastle.openpgp.PGPUtil; + +public class OpenPGPKeyBasedEncryptor implements Encryptor { + + private String algorithm; + private String provider; + private String keyring; + private String userId; + private char[] passphrase; + private String filename; + + public static final String SECURE_RANDOM_ALGORITHM = "SHA1PRNG"; + + public OpenPGPKeyBasedEncryptor(final String algorithm, final String provider, final String keyring, final String userId, + final char[] passphrase, final String filename) { + this.algorithm = algorithm; + this.provider = provider; + this.keyring = keyring; + this.userId = userId; + this.passphrase = passphrase; + this.filename = filename; + } + + @Override + public StreamCallback getEncryptionCallback() throws Exception { + return new OpenPGPEncryptCallback(algorithm, provider, keyring, userId, filename); + } + + @Override + public StreamCallback getDecryptionCallback() throws Exception { + return new OpenPGPDecryptCallback(provider, keyring, passphrase); + } + + /* + * Validate secret keyring passphrase + */ + public static boolean validateKeyring(String provider, String secretKeyringFile, char[] passphrase) throws IOException, + PGPException, NoSuchProviderException { + PGPSecretKeyRingCollection pgpsec = new PGPSecretKeyRingCollection(PGPUtil.getDecoderStream(Files.newInputStream(Paths + .get(secretKeyringFile)))); + Iterator ringit = pgpsec.getKeyRings(); + while (ringit.hasNext()) { + PGPSecretKeyRing secretkeyring = (PGPSecretKeyRing) ringit.next(); + PGPSecretKey secretkey = secretkeyring.getSecretKey(); + secretkey.extractPrivateKey(passphrase, provider); + return true; + } + return false; + } + + /* + * Get the public key for a specific user id from a keyring. + */ + @SuppressWarnings("rawtypes") + public static PGPPublicKey getPublicKey(String userId, String publicKeyring) throws IOException, PGPException { + PGPPublicKey pubkey = null; + PGPPublicKeyRingCollection pgppub = new + PGPPublicKeyRingCollection(PGPUtil.getDecoderStream(Files.newInputStream(Paths.get(publicKeyring)))); + + Iterator ringit = pgppub.getKeyRings(); + while (ringit.hasNext()) { + PGPPublicKeyRing kring = (PGPPublicKeyRing) ringit.next(); + + Iterator keyit = kring.getPublicKeys(); + while (keyit.hasNext()) { + pubkey = (PGPPublicKey) keyit.next(); + boolean userIdMatch = false; + + Iterator userit = pubkey.getUserIDs(); + while (userit.hasNext()) { + String id = userit.next().toString(); + if (id.contains(userId)) { + userIdMatch = true; + break; + } + } + if (pubkey.isEncryptionKey() && userIdMatch) { + return pubkey; + } + } + } + return null; + } + + private class OpenPGPDecryptCallback implements StreamCallback { + + private String provider; + private String secretKeyring; + private char[] passphrase; + + OpenPGPDecryptCallback(final String provider, final String keyring, final char[] passphrase) { + this.provider = provider; + this.secretKeyring = keyring; + this.passphrase = passphrase; + } + + @Override + public void process(InputStream in, OutputStream out) throws IOException { + InputStream pgpin = PGPUtil.getDecoderStream(in); + PGPObjectFactory pgpFactory = new PGPObjectFactory(pgpin); + + Object obj = pgpFactory.nextObject(); + if (!(obj instanceof PGPEncryptedDataList)) { + obj = pgpFactory.nextObject(); + if (!(obj instanceof PGPEncryptedDataList)) { + throw new ProcessException("Invalid OpenPGP data"); + } + } + PGPEncryptedDataList encList = (PGPEncryptedDataList) obj; + + PGPSecretKeyRingCollection pgpSecretKeyring; + try { + // open secret keyring file + pgpSecretKeyring = new PGPSecretKeyRingCollection(PGPUtil.getDecoderStream(Files + .newInputStream(Paths.get(secretKeyring)))); + } catch (Exception e) { + throw new ProcessException("Invalid secret keyring - " + e.getMessage()); + } + + try { + PGPPrivateKey privateKey = null; + PGPPublicKeyEncryptedData encData = null; + + // find the secret key in the encrypted data + Iterator it = encList.getEncryptedDataObjects(); + while (privateKey == null && it.hasNext()) { + obj = it.next(); + if (!(obj instanceof PGPPublicKeyEncryptedData)) { + throw new ProcessException("Invalid OpenPGP data"); + } + encData = (PGPPublicKeyEncryptedData) obj; + PGPSecretKey secretkey = pgpSecretKeyring.getSecretKey(encData.getKeyID()); + if (secretkey != null) { + privateKey = secretkey.extractPrivateKey(passphrase, provider); + } + } + if (privateKey == null) { + throw new ProcessException("Secret keyring does not contain the key required to decrypt"); + } + + InputStream clearData = encData.getDataStream(privateKey, provider); + PGPObjectFactory clearFactory = new PGPObjectFactory(clearData); + + obj = clearFactory.nextObject(); + if (obj instanceof PGPCompressedData) { + PGPCompressedData compData = (PGPCompressedData) obj; + clearFactory = new PGPObjectFactory(compData.getDataStream()); + obj = clearFactory.nextObject(); + } + PGPLiteralData literal = (PGPLiteralData) obj; + + InputStream lis = literal.getInputStream(); + final byte[] buffer = new byte[4096]; + int len; + while ((len = lis.read(buffer)) >= 0) { + out.write(buffer, 0, len); + } + } catch (Exception e) { + throw new ProcessException(e.getMessage()); + } + } + + } + + private class OpenPGPEncryptCallback implements StreamCallback { + + private String algorithm; + private String provider; + private String publicKeyring; + private String userId; + private String filename; + + OpenPGPEncryptCallback(final String algorithm, final String provider, final String keyring, final String userId, + final String filename) { + this.algorithm = algorithm; + this.provider = provider; + this.publicKeyring = keyring; + this.userId = userId; + this.filename = filename; + } + + @Override + public void process(InputStream in, OutputStream out) throws IOException { + PGPPublicKey publicKey; + try { + publicKey = getPublicKey(userId, publicKeyring); + } catch (Exception e) { + throw new ProcessException("Invalid public keyring - " + e.getMessage()); + } + + try { + SecureRandom secureRandom = SecureRandom.getInstance(SECURE_RANDOM_ALGORITHM); + + OutputStream output = out; + if (EncryptContent.isPGPArmoredAlgorithm(algorithm)) { + output = new ArmoredOutputStream(out); + } + + PGPEncryptedDataGenerator encGenerator = new PGPEncryptedDataGenerator(PGPEncryptedData.CAST5, false, + secureRandom, provider); + encGenerator.addMethod(publicKey); + OutputStream encOut = encGenerator.open(output, new byte[65536]); + + PGPCompressedDataGenerator compData = new PGPCompressedDataGenerator(PGPCompressedData.ZIP, Deflater.BEST_SPEED); + OutputStream compOut = compData.open(encOut, new byte[65536]); + + PGPLiteralDataGenerator literal = new PGPLiteralDataGenerator(); + OutputStream literalOut = literal.open(compOut, PGPLiteralData.BINARY, filename, new Date(), new byte[65536]); + + final byte[] buffer = new byte[4096]; + int len; + while ((len = in.read(buffer)) >= 0) { + literalOut.write(buffer, 0, len); + } + + literalOut.close(); + compOut.close(); + encOut.close(); + output.close(); + } catch (Exception e) { + throw new ProcessException(e.getMessage()); + } + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPPasswordBasedEncryptor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPPasswordBasedEncryptor.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPPasswordBasedEncryptor.java new file mode 100644 index 0000000..3e870df --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/OpenPGPPasswordBasedEncryptor.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.security.SecureRandom; +import java.util.Date; +import java.util.zip.Deflater; + +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processors.standard.EncryptContent; +import org.apache.nifi.processors.standard.EncryptContent.Encryptor; +import org.bouncycastle.bcpg.ArmoredOutputStream; +import org.bouncycastle.openpgp.PGPCompressedData; +import org.bouncycastle.openpgp.PGPCompressedDataGenerator; +import org.bouncycastle.openpgp.PGPEncryptedData; +import org.bouncycastle.openpgp.PGPEncryptedDataGenerator; +import org.bouncycastle.openpgp.PGPEncryptedDataList; +import org.bouncycastle.openpgp.PGPLiteralData; +import org.bouncycastle.openpgp.PGPLiteralDataGenerator; +import org.bouncycastle.openpgp.PGPObjectFactory; +import org.bouncycastle.openpgp.PGPPBEEncryptedData; +import org.bouncycastle.openpgp.PGPUtil; + +public class OpenPGPPasswordBasedEncryptor implements Encryptor { + + private String algorithm; + private String provider; + private char[] password; + private String filename; + + public static final String SECURE_RANDOM_ALGORITHM = "SHA1PRNG"; + + public OpenPGPPasswordBasedEncryptor(final String algorithm, final String provider, final char[] passphrase, final String filename) { + this.algorithm = algorithm; + this.provider = provider; + this.password = passphrase; + this.filename = filename; + } + + @Override + public StreamCallback getEncryptionCallback() throws Exception { + return new OpenPGPEncryptCallback(algorithm, provider, password, filename); + } + + @Override + public StreamCallback getDecryptionCallback() throws Exception { + return new OpenPGPDecryptCallback(provider, password); + } + + private class OpenPGPDecryptCallback implements StreamCallback { + + private String provider; + private char[] password; + + OpenPGPDecryptCallback(final String provider, final char[] password) { + this.provider = provider; + this.password = password; + } + + @Override + public void process(InputStream in, OutputStream out) throws IOException { + InputStream pgpin = PGPUtil.getDecoderStream(in); + PGPObjectFactory pgpFactory = new PGPObjectFactory(pgpin); + + Object obj = pgpFactory.nextObject(); + if (!(obj instanceof PGPEncryptedDataList)) { + obj = pgpFactory.nextObject(); + if (!(obj instanceof PGPEncryptedDataList)) { + throw new ProcessException("Invalid OpenPGP data"); + } + } + PGPEncryptedDataList encList = (PGPEncryptedDataList) obj; + + obj = encList.get(0); + if (!(obj instanceof PGPPBEEncryptedData)) { + throw new ProcessException("Invalid OpenPGP data"); + } + PGPPBEEncryptedData encData = (PGPPBEEncryptedData) obj; + + try { + InputStream clearData = encData.getDataStream(password, provider); + PGPObjectFactory clearFactory = new PGPObjectFactory(clearData); + + obj = clearFactory.nextObject(); + if (obj instanceof PGPCompressedData) { + PGPCompressedData compData = (PGPCompressedData) obj; + clearFactory = new PGPObjectFactory(compData.getDataStream()); + obj = clearFactory.nextObject(); + } + PGPLiteralData literal = (PGPLiteralData) obj; + + InputStream lis = literal.getInputStream(); + final byte[] buffer = new byte[4096]; + int len; + while ((len = lis.read(buffer)) >= 0) { + out.write(buffer, 0, len); + } + } catch (Exception e) { + throw new ProcessException(e.getMessage()); + } + } + + } + + private class OpenPGPEncryptCallback implements StreamCallback { + + private String algorithm; + private String provider; + private char[] password; + private String filename; + + OpenPGPEncryptCallback(final String algorithm, final String provider, final char[] password, final String filename) { + this.algorithm = algorithm; + this.provider = provider; + this.password = password; + this.filename = filename; + } + + @Override + public void process(InputStream in, OutputStream out) throws IOException { + try { + SecureRandom secureRandom = SecureRandom.getInstance(SECURE_RANDOM_ALGORITHM); + + OutputStream output = out; + if (EncryptContent.isPGPArmoredAlgorithm(algorithm)) { + output = new ArmoredOutputStream(out); + } + + PGPEncryptedDataGenerator encGenerator = new PGPEncryptedDataGenerator(PGPEncryptedData.CAST5, false, + secureRandom, provider); + encGenerator.addMethod(password); + OutputStream encOut = encGenerator.open(output, new byte[65536]); + + PGPCompressedDataGenerator compData = new PGPCompressedDataGenerator(PGPCompressedData.ZIP, Deflater.BEST_SPEED); + OutputStream compOut = compData.open(encOut, new byte[65536]); + + PGPLiteralDataGenerator literal = new PGPLiteralDataGenerator(); + OutputStream literalOut = literal.open(compOut, PGPLiteralData.BINARY, filename, new Date(), new byte[65536]); + + final byte[] buffer = new byte[4096]; + int len; + while ((len = in.read(buffer)) >= 0) { + literalOut.write(buffer, 0, len); + } + + literalOut.close(); + compOut.close(); + encOut.close(); + output.close(); + } catch (Exception e) { + throw new ProcessException(e.getMessage()); + } + + } + + } +}
