http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java index 7a2f57e..5a75172 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; - import org.apache.nifi.processor.DataUnit; import org.apache.nifi.provenance.search.SearchableField; import org.apache.nifi.util.FormatUtils; @@ -50,6 +49,12 @@ public class RepositoryConfiguration { private int journalCount = 16; private int compressionBlockBytes = 1024 * 1024; private int maxAttributeChars = 65536; + private int debugFrequency = 1_000_000; + + private Map<String, String> encryptionKeys; + private String keyId; + private String keyProviderImplementation; + private String keyProviderLocation; private List<SearchableField> searchableFields = new ArrayList<>(); private List<SearchableField> searchableAttributes = new ArrayList<>(); @@ -360,6 +365,54 @@ public class RepositoryConfiguration { return Optional.ofNullable(warmCacheFrequencyMinutes); } + public boolean supportsEncryption() { + boolean keyProviderIsConfigured = CryptoUtils.isValidKeyProvider(keyProviderImplementation, keyProviderLocation, keyId, encryptionKeys); + + return keyProviderIsConfigured; + } + + public Map<String, String> getEncryptionKeys() { + return encryptionKeys; + } + + public void setEncryptionKeys(Map<String, String> encryptionKeys) { + this.encryptionKeys = encryptionKeys; + } + + public String getKeyId() { + return keyId; + } + + public void setKeyId(String keyId) { + this.keyId = keyId; + } + + public String getKeyProviderImplementation() { + return keyProviderImplementation; + } + + public void setKeyProviderImplementation(String keyProviderImplementation) { + this.keyProviderImplementation = keyProviderImplementation; + } + + public String getKeyProviderLocation() { + return keyProviderLocation; + } + + public void setKeyProviderLocation(String keyProviderLocation) { + this.keyProviderLocation = keyProviderLocation; + } + + + public int getDebugFrequency() { + return debugFrequency; + } + + public void setDebugFrequency(int debugFrequency) { + this.debugFrequency = debugFrequency; + } + + public static RepositoryConfiguration create(final NiFiProperties nifiProperties) { final Map<String, Path> storageDirectories = nifiProperties.getProvenanceRepositoryPaths(); if (storageDirectories.isEmpty()) { @@ -436,6 +489,17 @@ public class RepositoryConfiguration { config.setAlwaysSync(alwaysSync); + config.setDebugFrequency(nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_REPO_DEBUG_FREQUENCY, config.getDebugFrequency())); + + // Encryption values may not be present but are only required for EncryptedWriteAheadProvenanceRepository + final String implementationClassName = nifiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS); + if (EncryptedWriteAheadProvenanceRepository.class.getName().equals(implementationClassName)) { + config.setEncryptionKeys(nifiProperties.getProvenanceRepoEncryptionKeys()); + config.setKeyId(nifiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_ID)); + config.setKeyProviderImplementation(nifiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS)); + config.setKeyProviderLocation(nifiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_LOCATION)); + } + return config; } }
http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java index 8975028..4782dbe 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; - import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.authorization.RequestAction; import org.apache.nifi.authorization.resource.Authorizable; @@ -84,7 +83,7 @@ import org.slf4j.LoggerFactory; */ public class WriteAheadProvenanceRepository implements ProvenanceRepository { private static final Logger logger = LoggerFactory.getLogger(WriteAheadProvenanceRepository.class); - private static final int BLOCK_SIZE = 1024 * 32; + static final int BLOCK_SIZE = 1024 * 32; public static final String EVENT_CATEGORY = "Provenance Repository"; private final RepositoryConfiguration config; @@ -129,6 +128,14 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository { } }; + init(recordWriterFactory, recordReaderFactory, eventReporter, authorizer, resourceFactory); + } + + synchronized void init(RecordWriterFactory recordWriterFactory, RecordReaderFactory recordReaderFactory, + final EventReporter eventReporter, final Authorizer authorizer, + final ProvenanceAuthorizableFactory resourceFactory) throws IOException { + final EventFileManager fileManager = new EventFileManager(); + eventStore = new PartitionedWriteAheadEventStore(config, recordWriterFactory, recordReaderFactory, eventReporter, fileManager); final IndexManager indexManager = new SimpleIndexManager(config); @@ -145,7 +152,7 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository { eventStore.reindexLatestEvents(eventIndex); } catch (final Exception e) { logger.error("Failed to re-index some of the Provenance Events. It is possible that some of the latest " - + "events will not be available from the Provenance Repository when a query is issued.", e); + + "events will not be available from the Provenance Repository when a query is issued.", e); } } @@ -282,4 +289,8 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository { public List<SearchableField> getSearchableAttributes() { return Collections.unmodifiableList(config.getSearchableAttributes()); } + + RepositoryConfiguration getConfig() { + return this.config; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java index a583403..f4b47d3 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java @@ -36,7 +36,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; - import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.Term; @@ -246,7 +245,7 @@ public class LuceneEventIndex implements EventIndex { final Document document = eventConverter.convert(event, summary); if (document == null) { - logger.debug("Received Provenance Event {} to index but it contained no information that should be indexed, so skipping it", event); + logger.debug("Received Provenance Event {} to index but it contained no information that should be indexed, so skipping it", event.getEventId()); } else { final File indexDir; if (event.getEventTime() == lastEventTime) { @@ -291,7 +290,7 @@ public class LuceneEventIndex implements EventIndex { final Document document = eventConverter.convert(event, location); if (document == null) { - logger.debug("Received Provenance Event {} to index but it contained no information that should be indexed, so skipping it", event); + logger.debug("Received Provenance Event {} to index but it contained no information that should be indexed, so skipping it", event.getEventId()); } else { final StoredDocument doc = new StoredDocument(document, location); boolean added = false; @@ -357,13 +356,13 @@ public class LuceneEventIndex implements EventIndex { eventOption = eventStore.getEvent(eventId); } catch (final Exception e) { logger.error("Failed to retrieve Provenance Event with ID " + eventId + " to calculate data lineage due to: " + e, e); - final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.<String> emptySet(), 1, user.getIdentity()); + final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.emptySet(), 1, user.getIdentity()); result.getResult().setError("Failed to retrieve Provenance Event with ID " + eventId + ". See logs for more information."); return result; } if (!eventOption.isPresent()) { - final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.<String> emptySet(), 1, user.getIdentity()); + final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.emptySet(), 1, user.getIdentity()); result.getResult().setError("Could not find Provenance Event with ID " + eventId); lineageSubmissionMap.put(result.getLineageIdentifier(), result); return result; @@ -524,7 +523,7 @@ public class LuceneEventIndex implements EventIndex { } default: { final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, - eventId, Collections.<String> emptyList(), 1, userId); + eventId, Collections.emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded"); @@ -533,7 +532,7 @@ public class LuceneEventIndex implements EventIndex { } } catch (final Exception e) { final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, - eventId, Collections.<String> emptyList(), 1, userId); + eventId, Collections.emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); submission.getResult().setError("Failed to expand children for lineage of event with ID " + eventId + " due to: " + e); return submission; @@ -564,7 +563,7 @@ public class LuceneEventIndex implements EventIndex { } default: { final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, - eventId, Collections.<String> emptyList(), 1, userId); + eventId, Collections.emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded"); @@ -573,7 +572,7 @@ public class LuceneEventIndex implements EventIndex { } } catch (final Exception e) { final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, - eventId, Collections.<String> emptyList(), 1, userId); + eventId, Collections.emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); submission.getResult().setError("Failed to expand parents for lineage of event with ID " + eventId + " due to: " + e); http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java index d6f50dd..2bc7fbe 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java @@ -56,4 +56,12 @@ public class EventFieldNames { public static final String EXPLICIT_VALUE = "Explicit Value"; public static final String LOOKUP_VALUE = "Lookup Value"; public static final String UNCHANGED_VALUE = "Unchanged"; + + // For encrypted records + public static final String IS_ENCRYPTED = "Encrypted Record"; + public static final String KEY_ID = "Encryption Key ID"; + public static final String VERSION = "Encryption Version"; + public static final String ALGORITHM = "Encryption Algorithm"; + public static final String IV = "Initialization Vector"; + public static final String ENCRYPTION_DETAILS = "Encryption Details"; } http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java index 7b33ded..0577636 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java @@ -66,12 +66,12 @@ public class LookupTableEventRecordFields { public static final RecordField CONTENT_CLAIM_SIZE = new SimpleRecordField(EventFieldNames.CONTENT_CLAIM_SIZE, FieldType.LONG, EXACTLY_ONE); public static final RecordField PREVIOUS_CONTENT_CLAIM = new ComplexRecordField(EventFieldNames.PREVIOUS_CONTENT_CLAIM, ZERO_OR_ONE, - CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE); + CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE); public static final RecordField CURRENT_CONTENT_CLAIM_EXPLICIT = new ComplexRecordField(EventFieldNames.EXPLICIT_VALUE, EXACTLY_ONE, - CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE); + CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE); public static final RecordField CURRENT_CONTENT_CLAIM = new UnionRecordField(EventFieldNames.CONTENT_CLAIM, - Repetition.EXACTLY_ONE, NO_VALUE, UNCHANGED_VALUE, CURRENT_CONTENT_CLAIM_EXPLICIT); + Repetition.EXACTLY_ONE, NO_VALUE, UNCHANGED_VALUE, CURRENT_CONTENT_CLAIM_EXPLICIT); // EventType-Specific fields http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java index 7110336..d596c8e 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java @@ -45,7 +45,6 @@ import static org.apache.nifi.provenance.schema.LookupTableEventRecordFields.UPD import java.util.ArrayList; import java.util.Collections; import java.util.List; - import org.apache.nifi.repository.schema.RecordField; import org.apache.nifi.repository.schema.RecordSchema; @@ -90,5 +89,4 @@ public class LookupTableEventSchema { final RecordSchema schema = new RecordSchema(fields); return schema; } - } http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java index 93c0669..dfbcd2b 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java @@ -25,7 +25,6 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.zip.GZIPInputStream; - import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.provenance.toc.TocReader; @@ -333,7 +332,7 @@ public abstract class CompressableRecordReader implements RecordReader { try { boolean read = true; while (read) { - final Optional<StandardProvenanceEventRecord> eventOptional = readToEvent(eventId, dis, serializationVersion); + final Optional<StandardProvenanceEventRecord> eventOptional = this.readToEvent(eventId, dis, serializationVersion); if (eventOptional.isPresent()) { pushbackEvent = eventOptional.get(); return Optional.of(pushbackEvent); http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java index 8e79ddd..7ae4adc 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java @@ -27,9 +27,11 @@ import java.io.InputStream; import java.nio.file.Path; import java.util.Collection; import java.util.zip.GZIPInputStream; - +import org.apache.nifi.properties.NiFiPropertiesLoader; import org.apache.nifi.provenance.ByteArraySchemaRecordReader; import org.apache.nifi.provenance.ByteArraySchemaRecordWriter; +import org.apache.nifi.provenance.CryptoUtils; +import org.apache.nifi.provenance.EncryptedSchemaRecordReader; import org.apache.nifi.provenance.EventIdFirstSchemaRecordReader; import org.apache.nifi.provenance.EventIdFirstSchemaRecordWriter; import org.apache.nifi.provenance.StandardRecordReader; @@ -37,17 +39,25 @@ import org.apache.nifi.provenance.lucene.LuceneUtil; import org.apache.nifi.provenance.toc.StandardTocReader; import org.apache.nifi.provenance.toc.TocReader; import org.apache.nifi.provenance.toc.TocUtil; +import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RecordReaders { + private static Logger logger = LoggerFactory.getLogger(RecordReaders.class); + + private static boolean isEncryptionAvailable = false; + private static boolean encryptionPropertiesRead = false; + /** * Creates a new Record Reader that is capable of reading Provenance Event Journals * - * @param file the Provenance Event Journal to read data from + * @param file the Provenance Event Journal to read data from * @param provenanceLogFiles collection of all provenance journal files - * @param maxAttributeChars the maximum number of characters to retrieve for any one attribute. This allows us to avoid - * issues where a FlowFile has an extremely large attribute and reading events - * for that FlowFile results in loading that attribute into memory many times, exhausting the Java Heap + * @param maxAttributeChars the maximum number of characters to retrieve for any one attribute. This allows us to avoid + * issues where a FlowFile has an extremely large attribute and reading events + * for that FlowFile results in loading that attribute into memory many times, exhausting the Java Heap * @return a Record Reader capable of reading Provenance Event Journals * @throws IOException if unable to create a Record Reader for the given file */ @@ -68,7 +78,7 @@ public class RecordReaders { } } - if ( file.exists() ) { + if (file.exists()) { try { fis = new FileInputStream(file); } catch (final FileNotFoundException fnfe) { @@ -77,7 +87,8 @@ public class RecordReaders { } String filename = file.getName(); - openStream: while ( fis == null ) { + openStream: + while (fis == null) { final File dir = file.getParentFile(); final String baseName = LuceneUtil.substringBefore(file.getName(), ".prov"); @@ -85,9 +96,9 @@ public class RecordReaders { // filename that we need. The majority of the time, we will use the extension ".prov.gz" // because most often we are compressing on rollover and most often we have already finished // compressing by the time that we are querying the data. - for ( final String extension : new String[] {".prov.gz", ".prov"} ) { + for (final String extension : new String[]{".prov.gz", ".prov"}) { file = new File(dir, baseName + extension); - if ( file.exists() ) { + if (file.exists()) { try { fis = new FileInputStream(file); filename = baseName + extension; @@ -104,7 +115,7 @@ public class RecordReaders { break; } - if ( fis == null ) { + if (fis == null) { throw new FileNotFoundException("Unable to locate file " + originalFile); } @@ -148,12 +159,25 @@ public class RecordReaders { final TocReader tocReader = new StandardTocReader(tocFile); return new EventIdFirstSchemaRecordReader(bufferedInStream, filename, tocReader, maxAttributeChars); } + case EncryptedSchemaRecordReader.SERIALIZATION_NAME: { + if (!tocFile.exists()) { + throw new FileNotFoundException("Cannot create TOC Reader because the file " + tocFile + " does not exist"); + } + + if (!isEncryptionAvailable()) { + throw new IOException("Cannot read encrypted repository because this reader is not configured for encryption"); + } + + final TocReader tocReader = new StandardTocReader(tocFile); + // Return a reader with no eventEncryptor because this method contract cannot change, then inject the encryptor from the writer in the calling method + return new EncryptedSchemaRecordReader(bufferedInStream, filename, tocReader, maxAttributeChars, null); + } default: { throw new IOException("Unable to read data from file " + file + " because the file was written using an unknown Serializer: " + serializationName); } } } catch (final IOException ioe) { - if ( fis != null ) { + if (fis != null) { try { fis.close(); } catch (final IOException inner) { @@ -165,4 +189,20 @@ public class RecordReaders { } } + private static boolean isEncryptionAvailable() { + if (encryptionPropertiesRead) { + return isEncryptionAvailable; + } else { + try { + NiFiProperties niFiProperties = NiFiPropertiesLoader.loadDefaultWithKeyFromBootstrap(); + isEncryptionAvailable = CryptoUtils.isProvenanceRepositoryEncryptionConfigured(niFiProperties); + encryptionPropertiesRead = true; + } catch (IOException e) { + logger.error("Encountered an error checking the provenance repository encryption configuration: ", e); + isEncryptionAvailable = false; + } + return isEncryptionAvailable; + } + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/StorageSummaryEvent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/StorageSummaryEvent.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/StorageSummaryEvent.java index 41d5ade..766278a 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/StorageSummaryEvent.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/StorageSummaryEvent.java @@ -19,7 +19,6 @@ package org.apache.nifi.provenance.util; import java.util.List; import java.util.Map; - import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.serialization.StorageSummary; @@ -182,4 +181,14 @@ public class StorageSummaryEvent implements ProvenanceEventRecord { public Long getPreviousContentClaimOffset() { return event.getPreviousContentClaimOffset(); } + + /** + * Returns the best event identifier for this event (eventId if available, descriptive identifier if not yet persisted to allow for traceability). + * + * @return a descriptive event ID to allow tracing + */ + @Override + public String getBestEventIdentifier() { + return Long.toString(getEventId()); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceRepository ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceRepository b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceRepository index 6a353d2..94cc70c 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceRepository +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceRepository @@ -13,4 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. org.apache.nifi.provenance.PersistentProvenanceRepository -org.apache.nifi.provenance.WriteAheadProvenanceRepository \ No newline at end of file +org.apache.nifi.provenance.WriteAheadProvenanceRepository +org.apache.nifi.provenance.EncryptedWriteAheadProvenanceRepository \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy new file mode 100644 index 0000000..ec8c225 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance + +import org.apache.nifi.flowfile.FlowFile +import org.apache.nifi.provenance.serialization.RecordReader +import org.apache.nifi.provenance.serialization.RecordWriter +import org.apache.nifi.provenance.toc.StandardTocReader +import org.apache.nifi.provenance.toc.StandardTocWriter +import org.apache.nifi.provenance.toc.TocReader +import org.apache.nifi.provenance.toc.TocUtil +import org.apache.nifi.provenance.toc.TocWriter +import org.apache.nifi.util.file.FileUtils +import org.bouncycastle.jce.provider.BouncyCastleProvider +import org.bouncycastle.util.encoders.Hex +import org.junit.After +import org.junit.AfterClass +import org.junit.Before +import org.junit.BeforeClass +import org.junit.ClassRule +import org.junit.Test +import org.junit.rules.TemporaryFolder +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import javax.crypto.Cipher +import javax.crypto.spec.SecretKeySpec +import java.security.KeyManagementException +import java.security.Security +import java.util.concurrent.atomic.AtomicLong + +import static groovy.test.GroovyAssert.shouldFail +import static org.apache.nifi.provenance.TestUtil.createFlowFile + +@RunWith(JUnit4.class) +class EncryptedSchemaRecordReaderWriterTest extends AbstractTestRecordReaderWriter { + private static final Logger logger = LoggerFactory.getLogger(EncryptedSchemaRecordReaderWriterTest.class) + + private static final String KEY_HEX_128 = "0123456789ABCDEFFEDCBA9876543210" + private static final String KEY_HEX_256 = KEY_HEX_128 * 2 + private static final String KEY_HEX = isUnlimitedStrengthCryptoAvailable() ? KEY_HEX_256 : KEY_HEX_128 + private static final String KEY_ID = "K1" + + private static final String TRANSIT_URI = "nifi://unit-test" + private static final String PROCESSOR_TYPE = "Mock Processor" + private static final String COMPONENT_ID = "1234" + + private static final int UNCOMPRESSED_BLOCK_SIZE = 1024 * 32 + private static final int MAX_ATTRIBUTE_SIZE = 2048 + + private static final AtomicLong idGenerator = new AtomicLong(0L) + private File journalFile + private File tocFile + + private static KeyProvider mockKeyProvider + private static ProvenanceEventEncryptor provenanceEventEncryptor = new AESProvenanceEventEncryptor() + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder() + + private static String ORIGINAL_LOG_LEVEL + + @BeforeClass + static void setUpOnce() throws Exception { + ORIGINAL_LOG_LEVEL = System.getProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance") + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG") + + Security.addProvider(new BouncyCastleProvider()) + + logger.metaClass.methodMissing = { String name, args -> + logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") + } + + mockKeyProvider = [ + getKey : { String keyId -> + logger.mock("Requesting key ID: ${keyId}") + if (keyId == KEY_ID) { + new SecretKeySpec(Hex.decode(KEY_HEX), "AES") + } else { + throw new KeyManagementException("${keyId} is not available") + } + }, + getAvailableKeyIds: { -> + logger.mock("Available key IDs: [${KEY_ID}]") + [KEY_ID] + }, + keyExists : { String keyId -> + logger.mock("Checking availability of key ID: ${keyId}") + keyId == KEY_ID + }] as KeyProvider + provenanceEventEncryptor.initialize(mockKeyProvider) + } + + @Before + void setUp() throws Exception { + journalFile = new File("target/storage/${UUID.randomUUID()}/testEventIdFirstSchemaRecordReaderWriter") + tocFile = TocUtil.getTocFile(journalFile) + idGenerator.set(0L) + } + + @After + void tearDown() throws Exception { + try { + FileUtils.deleteFile(journalFile.getParentFile(), true) + } catch (Exception e) { + logger.error(e.getMessage()) + } + } + + @AfterClass + static void tearDownOnce() throws Exception { + if (ORIGINAL_LOG_LEVEL) { + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", ORIGINAL_LOG_LEVEL) + } + try { + FileUtils.deleteFile(new File("target/storage"), true) + } catch (Exception e) { + logger.error(e) + } + } + + private static boolean isUnlimitedStrengthCryptoAvailable() { + Cipher.getMaxAllowedKeyLength("AES") > 128 + } + + private static + final FlowFile buildFlowFile(Map attributes = [:], long id = idGenerator.getAndIncrement(), long fileSize = 3000L) { + if (!attributes?.uuid) { + attributes.uuid = UUID.randomUUID().toString() + } + createFlowFile(id, fileSize, attributes) + } + + private + static ProvenanceEventRecord buildEventRecord(FlowFile flowfile = buildFlowFile(), ProvenanceEventType eventType = ProvenanceEventType.RECEIVE, String transitUri = TRANSIT_URI, String componentId = COMPONENT_ID, String componentType = PROCESSOR_TYPE, long eventTime = System.currentTimeMillis()) { + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder() + builder.setEventTime(eventTime) + builder.setEventType(eventType) + builder.setTransitUri(transitUri) + builder.fromFlowFile(flowfile) + builder.setComponentId(componentId) + builder.setComponentType(componentType) + builder.build() + } + + @Override + protected RecordWriter createWriter( + final File file, + final TocWriter tocWriter, final boolean compressed, final int uncompressedBlockSize) throws IOException { + createWriter(file, tocWriter, compressed, uncompressedBlockSize, provenanceEventEncryptor) + } + + protected static RecordWriter createWriter( + final File file, + final TocWriter tocWriter, + final boolean compressed, + final int uncompressedBlockSize, ProvenanceEventEncryptor encryptor) throws IOException { + return new EncryptedSchemaRecordWriter(file, idGenerator, tocWriter, compressed, uncompressedBlockSize, IdentifierLookup.EMPTY, encryptor, 1) + } + + @Override + protected RecordReader createReader( + final InputStream inputStream, + final String journalFilename, final TocReader tocReader, final int maxAttributeSize) throws IOException { + return new EncryptedSchemaRecordReader(inputStream, journalFilename, tocReader, maxAttributeSize, provenanceEventEncryptor) + } + + /** + * Build a record and write it to the repository with the encrypted writer. Recover with the encrypted reader and verify. + */ + @Test + void testShouldWriteAndReadEncryptedRecord() { + // Arrange + final ProvenanceEventRecord record = buildEventRecord() + logger.info("Built sample PER: ${record}") + + TocWriter tocWriter = new StandardTocWriter(tocFile, false, false) + + RecordWriter encryptedWriter = createWriter(journalFile, tocWriter, false, UNCOMPRESSED_BLOCK_SIZE) + logger.info("Generated encrypted writer: ${encryptedWriter}") + + // Act + int encryptedRecordId = idGenerator.get() + encryptedWriter.writeHeader(encryptedRecordId) + encryptedWriter.writeRecord(record) + encryptedWriter.close() + logger.info("Wrote encrypted record ${encryptedRecordId} to journal") + + // Assert + TocReader tocReader = new StandardTocReader(tocFile) + final FileInputStream fis = new FileInputStream(journalFile) + final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, MAX_ATTRIBUTE_SIZE) + logger.info("Generated encrypted reader: ${reader}") + + ProvenanceEventRecord encryptedEvent = reader.nextRecord() + assert encryptedEvent + assert encryptedRecordId as long == encryptedEvent.getEventId() + assert record.componentId == encryptedEvent.getComponentId() + assert record.componentType == encryptedEvent.getComponentType() + logger.info("Successfully read encrypted record: ${encryptedEvent}") + + assert !reader.nextRecord() + } + + /** + * Build a record and write it with a standard writer and the encrypted writer to different repositories. Recover with the standard reader and the contents of the encrypted record should be unreadable. + */ + @Test + void testShouldWriteEncryptedRecordAndPlainRecord() { + // Arrange + final ProvenanceEventRecord record = buildEventRecord() + logger.info("Built sample PER: ${record}") + + TocWriter tocWriter = new StandardTocWriter(tocFile, false, false) + + RecordWriter standardWriter = new EventIdFirstSchemaRecordWriter(journalFile, idGenerator, tocWriter, false, UNCOMPRESSED_BLOCK_SIZE, IdentifierLookup.EMPTY) + logger.info("Generated standard writer: ${standardWriter}") + + File encryptedJournalFile = new File(journalFile.absolutePath + "_encrypted") + File encryptedTocFile = TocUtil.getTocFile(encryptedJournalFile) + TocWriter encryptedTocWriter = new StandardTocWriter(encryptedTocFile, false, false) + RecordWriter encryptedWriter = createWriter(encryptedJournalFile, encryptedTocWriter, false, UNCOMPRESSED_BLOCK_SIZE) + logger.info("Generated encrypted writer: ${encryptedWriter}") + + // Act + int standardRecordId = idGenerator.get() + standardWriter.writeHeader(standardRecordId) + standardWriter.writeRecord(record) + standardWriter.close() + logger.info("Wrote standard record ${standardRecordId} to journal") + + int encryptedRecordId = idGenerator.get() + encryptedWriter.writeHeader(encryptedRecordId) + encryptedWriter.writeRecord(record) + encryptedWriter.close() + logger.info("Wrote encrypted record ${encryptedRecordId} to journal") + + // Assert + TocReader tocReader = new StandardTocReader(tocFile) + final FileInputStream fis = new FileInputStream(journalFile) + final RecordReader reader = new EventIdFirstSchemaRecordReader(fis, journalFile.getName(), tocReader, MAX_ATTRIBUTE_SIZE) + logger.info("Generated standard reader: ${reader}") + + ProvenanceEventRecord standardEvent = reader.nextRecord() + assert standardEvent + assert standardRecordId as long == standardEvent.getEventId() + assert record.componentId == standardEvent.getComponentId() + assert record.componentType == standardEvent.getComponentType() + logger.info("Successfully read standard record: ${standardEvent}") + + assert !reader.nextRecord() + + // Demonstrate unable to read from encrypted file with standard reader + TocReader incompatibleTocReader = new StandardTocReader(encryptedTocFile) + final FileInputStream efis = new FileInputStream(encryptedJournalFile) + RecordReader incompatibleReader = new EventIdFirstSchemaRecordReader(efis, encryptedJournalFile.getName(), incompatibleTocReader, MAX_ATTRIBUTE_SIZE) + logger.info("Generated standard reader (attempting to read encrypted file): ${incompatibleReader}") + + def msg = shouldFail(EOFException) { + ProvenanceEventRecord encryptedEvent = incompatibleReader.nextRecord() + } + logger.expected(msg) + assert msg =~ "EOFException: Failed to read field" + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepositoryTest.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepositoryTest.groovy b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepositoryTest.groovy new file mode 100644 index 0000000..42cc881 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepositoryTest.groovy @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License") you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance + +import org.apache.nifi.events.EventReporter +import org.apache.nifi.flowfile.FlowFile +import org.apache.nifi.provenance.serialization.RecordReaders +import org.apache.nifi.reporting.Severity +import org.apache.nifi.util.file.FileUtils +import org.bouncycastle.jce.provider.BouncyCastleProvider +import org.junit.After +import org.junit.AfterClass +import org.junit.Before +import org.junit.BeforeClass +import org.junit.ClassRule +import org.junit.Test +import org.junit.rules.TemporaryFolder +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import javax.crypto.Cipher +import java.security.Security +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicLong + +import static org.apache.nifi.provenance.TestUtil.createFlowFile + +@RunWith(JUnit4.class) +class EncryptedWriteAheadProvenanceRepositoryTest { + private static final Logger logger = LoggerFactory.getLogger(EncryptedWriteAheadProvenanceRepositoryTest.class) + + private static final String KEY_HEX_128 = "0123456789ABCDEFFEDCBA9876543210" + private static final String KEY_HEX_256 = KEY_HEX_128 * 2 + private static final String KEY_HEX = isUnlimitedStrengthCryptoAvailable() ? KEY_HEX_256 : KEY_HEX_128 + private static final String KEY_ID = "K1" + + private static final String TRANSIT_URI = "nifi://unit-test" + private static final String PROCESSOR_TYPE = "Mock Processor" + private static final String COMPONENT_ID = "1234" + + private static final AtomicLong recordId = new AtomicLong() + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder() + + private ProvenanceRepository repo + private static RepositoryConfiguration config + + public static final int DEFAULT_ROLLOVER_MILLIS = 2000 + private EventReporter eventReporter + private List<ReportedEvent> reportedEvents = Collections.synchronizedList(new ArrayList<ReportedEvent>()) + + private static String ORIGINAL_LOG_LEVEL + + @BeforeClass + static void setUpOnce() throws Exception { + ORIGINAL_LOG_LEVEL = System.getProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance") + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG") + + Security.addProvider(new BouncyCastleProvider()) + + logger.metaClass.methodMissing = { String name, args -> + logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") + } + } + + @Before + void setUp() throws Exception { + reportedEvents?.clear() + eventReporter = createMockEventReporter() + } + + @After + void tearDown() throws Exception { + closeRepo(repo, config) + + // Reset the boolean determiner + RecordReaders.encryptionPropertiesRead = false + RecordReaders.isEncryptionAvailable = false + } + + @AfterClass + static void tearDownOnce() throws Exception { + if (ORIGINAL_LOG_LEVEL) { + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", ORIGINAL_LOG_LEVEL) + } + } + + private static boolean isUnlimitedStrengthCryptoAvailable() { + Cipher.getMaxAllowedKeyLength("AES") > 128 + } + + private static RepositoryConfiguration createConfiguration() { + RepositoryConfiguration config = new RepositoryConfiguration() + config.addStorageDirectory("1", new File("target/storage/" + UUID.randomUUID().toString())) + config.setCompressOnRollover(true) + config.setMaxEventFileLife(2000L, TimeUnit.SECONDS) + config.setCompressionBlockBytes(100) + return config + } + + private static RepositoryConfiguration createEncryptedConfiguration() { + RepositoryConfiguration config = createConfiguration() + config.setEncryptionKeys([(KEY_ID): KEY_HEX]) + config.setKeyId(KEY_ID) + config.setKeyProviderImplementation(StaticKeyProvider.class.name) + config + } + + private EventReporter createMockEventReporter() { + [reportEvent: { Severity s, String c, String m -> + ReportedEvent event = new ReportedEvent(s, c, m) + reportedEvents.add(event) + logger.mock("Added ${event}") + }] as EventReporter + } + + private void closeRepo(ProvenanceRepository repo = this.repo, RepositoryConfiguration config = this.config) throws IOException { + if (repo == null) { + return + } + + try { + repo.close() + } catch (IOException ioe) { + } + + // Delete all of the storage files. We do this in order to clean up the tons of files that + // we create but also to ensure that we have closed all of the file handles. If we leave any + // streams open, for instance, this will throw an IOException, causing our unit test to fail. + if (config != null) { + for (final File storageDir : config.getStorageDirectories().values()) { + int i + for (i = 0; i < 3; i++) { + try { + FileUtils.deleteFile(storageDir, true) + break + } catch (IOException ioe) { + // if there is a virus scanner, etc. running in the background we may not be able to + // delete the file. Wait a sec and try again. + if (i == 2) { + throw ioe + } else { + try { + System.out.println("file: " + storageDir.toString() + " exists=" + storageDir.exists()) + FileUtils.deleteFile(storageDir, true) + break + } catch (final IOException ioe2) { + // if there is a virus scanner, etc. running in the background we may not be able to + // delete the file. Wait a sec and try again. + if (i == 2) { + throw ioe2 + } else { + try { + Thread.sleep(1000L) + } catch (final InterruptedException ie) { + } + } + } + } + } + } + } + } + } + + private static + final FlowFile buildFlowFile(Map attributes = [:], long id = recordId.getAndIncrement(), long fileSize = 3000L) { + if (!attributes?.uuid) { + attributes.uuid = UUID.randomUUID().toString() + } + createFlowFile(id, fileSize, attributes) + } + + private + static ProvenanceEventRecord buildEventRecord(FlowFile flowfile = buildFlowFile(), ProvenanceEventType eventType = ProvenanceEventType.RECEIVE, String transitUri = TRANSIT_URI, String componentId = COMPONENT_ID, String componentType = PROCESSOR_TYPE, long eventTime = System.currentTimeMillis()) { + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder() + builder.setEventTime(eventTime) + builder.setEventType(eventType) + builder.setTransitUri(transitUri) + builder.fromFlowFile(flowfile) + builder.setComponentId(componentId) + builder.setComponentType(componentType) + builder.build() + } + + /** + * This test operates on {@link PersistentProvenanceRepository} to verify the normal operations of existing implementations. + * + * @throws IOException + * @throws InterruptedException + */ + @Test + void testPersistentProvenanceRepositoryShouldRegisterAndRetrieveEvents() throws IOException, InterruptedException { + // Arrange + config = createConfiguration() + config.setMaxEventFileCapacity(1L) + config.setMaxEventFileLife(1, TimeUnit.SECONDS) + repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) + repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY) + + Map attributes = ["abc": "xyz", + "123": "456"] + final ProvenanceEventRecord record = buildEventRecord(buildFlowFile(attributes)) + + final int RECORD_COUNT = 10 + + // Act + RECORD_COUNT.times { + repo.registerEvent(record) + } + + // Sleep to let the journal merge occur + Thread.sleep(1000L) + + final List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(0L, RECORD_COUNT + 1) + + logger.info("Recovered ${recoveredRecords.size()} events: ") + recoveredRecords.each { logger.info("\t${it}") } + + // Assert + assert recoveredRecords.size() == RECORD_COUNT + recoveredRecords.eachWithIndex { ProvenanceEventRecord recovered, int i -> + assert recovered.getEventId() == (i as Long) + assert recovered.getTransitUri() == TRANSIT_URI + assert recovered.getEventType() == ProvenanceEventType.RECEIVE + // The UUID was added later but we care that all attributes we provided are still there + assert recovered.getAttributes().entrySet().containsAll(attributes.entrySet()) + } + } + + /** + * This test operates on {@link WriteAheadProvenanceRepository} to verify the normal operations of existing implementations. + * + * @throws IOException + * @throws InterruptedException + */ + @Test + void testWriteAheadProvenanceRepositoryShouldRegisterAndRetrieveEvents() throws IOException, InterruptedException { + // Arrange + config = createConfiguration() + // Needed until NIFI-3605 is implemented +// config.setMaxEventFileCapacity(1L) + config.setMaxEventFileCount(1) + config.setMaxEventFileLife(1, TimeUnit.SECONDS) + repo = new WriteAheadProvenanceRepository(config) + repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY) + + Map attributes = ["abc": "xyz", + "123": "456"] + final ProvenanceEventRecord record = buildEventRecord(buildFlowFile(attributes)) + + final int RECORD_COUNT = 10 + + // Act + RECORD_COUNT.times { + repo.registerEvent(record) + } + + final List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(0L, RECORD_COUNT + 1) + + logger.info("Recovered ${recoveredRecords.size()} events: ") + recoveredRecords.each { logger.info("\t${it}") } + + // Assert + assert recoveredRecords.size() == RECORD_COUNT + recoveredRecords.eachWithIndex { ProvenanceEventRecord recovered, int i -> + assert recovered.getEventId() == (i as Long) + assert recovered.getTransitUri() == TRANSIT_URI + assert recovered.getEventType() == ProvenanceEventType.RECEIVE + // The UUID was added later but we care that all attributes we provided are still there + assert recovered.getAttributes().entrySet().containsAll(attributes.entrySet()) + } + } + + @Test + void testShouldRegisterAndGetEvent() { + // Arrange + + // Override the boolean determiner + RecordReaders.encryptionPropertiesRead = true + RecordReaders.isEncryptionAvailable = true + + config = createEncryptedConfiguration() + // Needed until NIFI-3605 is implemented +// config.setMaxEventFileCapacity(1L) + config.setMaxEventFileCount(1) + config.setMaxEventFileLife(1, TimeUnit.SECONDS) + repo = new EncryptedWriteAheadProvenanceRepository(config) + repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY) + + Map attributes = ["abc": "This is a plaintext attribute.", + "123": "This is another plaintext attribute."] + final ProvenanceEventRecord record = buildEventRecord(buildFlowFile(attributes)) + + final long LAST_RECORD_ID = repo.getMaxEventId() + + // Act + repo.registerEvent(record) + + // Retrieve the event through the interface + ProvenanceEventRecord recoveredRecord = repo.getEvent(LAST_RECORD_ID + 1) + logger.info("Recovered ${recoveredRecord}") + + // Assert + assert recoveredRecord.getEventId() == LAST_RECORD_ID + 1 + assert recoveredRecord.getTransitUri() == TRANSIT_URI + assert recoveredRecord.getEventType() == ProvenanceEventType.RECEIVE + // The UUID was added later but we care that all attributes we provided are still there + assert recoveredRecord.getAttributes().entrySet().containsAll(attributes.entrySet()) + } + + @Test + void testShouldRegisterAndGetEvents() { + // Arrange + final int RECORD_COUNT = 10 + + // Override the boolean determiner + RecordReaders.encryptionPropertiesRead = true + RecordReaders.isEncryptionAvailable = true + + config = createEncryptedConfiguration() + // Needed until NIFI-3605 is implemented +// config.setMaxEventFileCapacity(1L) + config.setMaxEventFileCount(1) + config.setMaxEventFileLife(1, TimeUnit.SECONDS) + repo = new EncryptedWriteAheadProvenanceRepository(config) + repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY) + + Map attributes = ["abc": "This is a plaintext attribute.", + "123": "This is another plaintext attribute."] + final List<ProvenanceEventRecord> records = [] + RECORD_COUNT.times { int i -> + records << buildEventRecord(buildFlowFile(attributes + [count: i as String])) + } + logger.info("Generated ${RECORD_COUNT} records") + + final long LAST_RECORD_ID = repo.getMaxEventId() + + // Act + repo.registerEvents(records) + logger.info("Registered events") + + // Retrieve the events through the interface + List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(LAST_RECORD_ID + 1, RECORD_COUNT * 2) + logger.info("Recovered ${recoveredRecords.size()} records") + + // Assert + recoveredRecords.eachWithIndex { ProvenanceEventRecord recoveredRecord, int i -> + assert recoveredRecord.getEventId() == LAST_RECORD_ID + 1 + i + assert recoveredRecord.getTransitUri() == TRANSIT_URI + assert recoveredRecord.getEventType() == ProvenanceEventType.RECEIVE + // The UUID was added later but we care that all attributes we provided are still there + assert recoveredRecord.getAttributes().entrySet().containsAll(attributes.entrySet()) + assert recoveredRecord.getAttribute("count") == i as String + } + } + + private static class ReportedEvent { + final Severity severity + final String category + final String message + + ReportedEvent(final Severity severity, final String category, final String message) { + this.severity = severity + this.category = category + this.message = message + } + + @Override + String toString() { + "ReportedEvent [${severity}] ${category}: ${message}" + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java index 36397c4..4b2ca50 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java @@ -32,7 +32,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; - import org.apache.nifi.provenance.serialization.RecordReader; import org.apache.nifi.provenance.serialization.RecordWriter; import org.apache.nifi.provenance.toc.StandardTocReader; @@ -67,19 +66,25 @@ public abstract class AbstractTestRecordReaderWriter { writer.close(); final TocReader tocReader = new StandardTocReader(tocFile); + final String expectedTransitUri = "nifi://unit-test"; + final int expectedBlockIndex = 0; + + assertRecoveredRecord(journalFile, tocReader, expectedTransitUri, expectedBlockIndex); + + FileUtils.deleteFile(journalFile.getParentFile(), true); + } + private void assertRecoveredRecord(File journalFile, TocReader tocReader, String expectedTransitUri, int expectedBlockIndex) throws IOException { try (final FileInputStream fis = new FileInputStream(journalFile); - final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) { - assertEquals(0, reader.getBlockIndex()); - reader.skipToBlock(0); + final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) { + assertEquals(expectedBlockIndex, reader.getBlockIndex()); + reader.skipToBlock(expectedBlockIndex); final StandardProvenanceEventRecord recovered = reader.nextRecord(); assertNotNull(recovered); - assertEquals("nifi://unit-test", recovered.getTransitUri()); + assertEquals(expectedTransitUri, recovered.getTransitUri()); assertNull(reader.nextRecord()); } - - FileUtils.deleteFile(journalFile.getParentFile(), true); } @@ -96,16 +101,7 @@ public abstract class AbstractTestRecordReaderWriter { final TocReader tocReader = new StandardTocReader(tocFile); - try (final FileInputStream fis = new FileInputStream(journalFile); - final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) { - assertEquals(0, reader.getBlockIndex()); - reader.skipToBlock(0); - final StandardProvenanceEventRecord recovered = reader.nextRecord(); - assertNotNull(recovered); - - assertEquals("nifi://unit-test", recovered.getTransitUri()); - assertNull(reader.nextRecord()); - } + assertRecoveredRecord(journalFile, tocReader, "nifi://unit-test", 0); FileUtils.deleteFile(journalFile.getParentFile(), true); } http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java index f08fed4..c3fbf42 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java @@ -16,6 +16,25 @@ */ package org.apache.nifi.provenance; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; import org.apache.nifi.authorization.AccessDeniedException; import org.apache.nifi.authorization.AuthorizationResult; import org.apache.nifi.authorization.AuthorizationResult.Result; @@ -42,26 +61,6 @@ import org.apache.nifi.util.RingBuffer.ForEachEvaluator; import org.apache.nifi.util.RingBuffer.IterationDirection; import org.apache.nifi.web.ResourceNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.regex.Pattern; - public class VolatileProvenanceRepository implements ProvenanceRepository { // properties @@ -472,7 +471,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { } public Lineage computeLineage(final String flowFileUUID, final NiFiUser user) throws IOException { - return computeLineage(Collections.<String>singleton(flowFileUUID), user, LineageComputationType.FLOWFILE_LINEAGE, null); + return computeLineage(Collections.singleton(flowFileUUID), user, LineageComputationType.FLOWFILE_LINEAGE, null); } private Lineage computeLineage(final Collection<String> flowFileUuids, final NiFiUser user, final LineageComputationType computationType, final Long eventId) throws IOException { @@ -497,7 +496,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { final ProvenanceEventRecord event = getEvent(eventId); if (event == null) { final String userId = user.getIdentity(); - final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.<String>emptySet(), 1, userId); + final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.emptySet(), 1, userId); result.getResult().setError("Could not find event with ID " + eventId); lineageSubmissionMap.put(result.getLineageIdentifier(), result); return result; @@ -541,9 +540,9 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { final ProvenanceEventRecord event = getEvent(eventId, user); if (event == null) { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1, userId); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); - submission.getResult().update(Collections.<ProvenanceEventRecord> emptyList(), 0L); + submission.getResult().update(Collections.emptyList(), 0L); return submission; } @@ -554,7 +553,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { case CLONE: return submitLineageComputation(event.getParentUuids(), user, LineageComputationType.EXPAND_PARENTS, eventId); default: { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1, userId); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded"); return submission; @@ -572,9 +571,9 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { final ProvenanceEventRecord event = getEvent(eventId, user); if (event == null) { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1, userId); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); - submission.getResult().update(Collections.<ProvenanceEventRecord> emptyList(), 0L); + submission.getResult().update(Collections.emptyList(), 0L); return submission; } @@ -585,7 +584,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { case CLONE: return submitLineageComputation(event.getChildUuids(), user, LineageComputationType.EXPAND_CHILDREN, eventId); default: { - final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1, userId); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded"); return submission; @@ -873,5 +872,15 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { public Long getPreviousContentClaimOffset() { return record.getPreviousContentClaimOffset(); } + + /** + * Returns the best event identifier for this event (eventId if available, descriptive identifier if not yet persisted to allow for traceability). + * + * @return a descriptive event ID to allow tracing + */ + @Override + public String getBestEventIdentifier() { + return Long.toString(getEventId()); + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index d0a1f1c..0b206e2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -437,7 +437,7 @@ <exclude>src/test/resources/TestExtractGrok/simple_text.log</exclude> <exclude>src/test/resources/TestExtractGrok/patterns</exclude> <!-- This file is copied from https://github.com/jeremyh/jBCrypt because the binary is compiled for Java 8 and we must support Java 7 --> - <exclude>src/main/java/org/apache/nifi/processors/standard/util/crypto/bcrypt/BCrypt.java</exclude> + <exclude>src/main/java/org/apache/nifi/security/util/crypto/bcrypt/BCrypt.java</exclude> </excludes> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java index 103790e..db6d9ba 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java @@ -16,6 +16,16 @@ */ package org.apache.nifi.processors.standard; +import java.nio.charset.StandardCharsets; +import java.security.Security; +import java.text.Normalizer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; import org.apache.commons.lang3.StringUtils; @@ -41,27 +51,16 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.standard.util.crypto.CipherUtility; -import org.apache.nifi.processors.standard.util.crypto.KeyedEncryptor; -import org.apache.nifi.processors.standard.util.crypto.OpenPGPKeyBasedEncryptor; -import org.apache.nifi.processors.standard.util.crypto.OpenPGPPasswordBasedEncryptor; -import org.apache.nifi.processors.standard.util.crypto.PasswordBasedEncryptor; import org.apache.nifi.security.util.EncryptionMethod; import org.apache.nifi.security.util.KeyDerivationFunction; +import org.apache.nifi.security.util.crypto.CipherUtility; +import org.apache.nifi.security.util.crypto.KeyedEncryptor; +import org.apache.nifi.security.util.crypto.OpenPGPKeyBasedEncryptor; +import org.apache.nifi.security.util.crypto.OpenPGPPasswordBasedEncryptor; +import org.apache.nifi.security.util.crypto.PasswordBasedEncryptor; import org.apache.nifi.util.StopWatch; import org.bouncycastle.jce.provider.BouncyCastleProvider; -import java.nio.charset.StandardCharsets; -import java.security.Security; -import java.text.Normalizer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - @EventDriven @SideEffectFree @SupportsBatching http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/crypto/AESKeyedCipherProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/crypto/AESKeyedCipherProvider.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/crypto/AESKeyedCipherProvider.java deleted file mode 100644 index 907aed2..0000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/crypto/AESKeyedCipherProvider.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard.util.crypto; - -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.security.util.EncryptionMethod; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.crypto.Cipher; -import javax.crypto.NoSuchPaddingException; -import javax.crypto.SecretKey; -import javax.crypto.spec.IvParameterSpec; -import java.io.UnsupportedEncodingException; -import java.security.InvalidAlgorithmParameterException; -import java.security.InvalidKeyException; -import java.security.NoSuchAlgorithmException; -import java.security.NoSuchProviderException; -import java.security.SecureRandom; -import java.security.spec.InvalidKeySpecException; -import java.util.Arrays; -import java.util.List; - -/** - * This is a standard implementation of {@link KeyedCipherProvider} which supports {@code AES} cipher families with arbitrary modes of operation (currently only {@code CBC}, {@code CTR}, and {@code - * GCM} are supported as {@link EncryptionMethod}s. - */ -public class AESKeyedCipherProvider extends KeyedCipherProvider { - private static final Logger logger = LoggerFactory.getLogger(AESKeyedCipherProvider.class); - private static final int IV_LENGTH = 16; - private static final List<Integer> VALID_KEY_LENGTHS = Arrays.asList(128, 192, 256); - - /** - * Returns an initialized cipher for the specified algorithm. The IV is provided externally to allow for non-deterministic IVs, as IVs - * deterministically derived from the password are a potential vulnerability and compromise semantic security. See - * <a href="http://crypto.stackexchange.com/a/3970/12569">Ilmari Karonen's answer on Crypto Stack Exchange</a> - * - * @param encryptionMethod the {@link EncryptionMethod} - * @param key the key - * @param iv the IV or nonce (cannot be all 0x00) - * @param encryptMode true for encrypt, false for decrypt - * @return the initialized cipher - * @throws Exception if there is a problem initializing the cipher - */ - @Override - public Cipher getCipher(EncryptionMethod encryptionMethod, SecretKey key, byte[] iv, boolean encryptMode) throws Exception { - try { - return getInitializedCipher(encryptionMethod, key, iv, encryptMode); - } catch (IllegalArgumentException e) { - throw e; - } catch (Exception e) { - throw new ProcessException("Error initializing the cipher", e); - } - } - - /** - * Returns an initialized cipher for the specified algorithm. The IV will be generated internally (for encryption). If decryption is requested, it will throw an exception. - * - * @param encryptionMethod the {@link EncryptionMethod} - * @param key the key - * @param encryptMode true for encrypt, false for decrypt - * @return the initialized cipher - * @throws Exception if there is a problem initializing the cipher or if decryption is requested - */ - @Override - public Cipher getCipher(EncryptionMethod encryptionMethod, SecretKey key, boolean encryptMode) throws Exception { - return getCipher(encryptionMethod, key, new byte[0], encryptMode); - } - - protected Cipher getInitializedCipher(EncryptionMethod encryptionMethod, SecretKey key, byte[] iv, - boolean encryptMode) throws NoSuchAlgorithmException, NoSuchProviderException, - InvalidKeySpecException, NoSuchPaddingException, InvalidKeyException, InvalidAlgorithmParameterException, UnsupportedEncodingException { - if (encryptionMethod == null) { - throw new IllegalArgumentException("The encryption method must be specified"); - } - - if (!encryptionMethod.isKeyedCipher()) { - throw new IllegalArgumentException(encryptionMethod.name() + " requires a PBECipherProvider"); - } - - String algorithm = encryptionMethod.getAlgorithm(); - String provider = encryptionMethod.getProvider(); - - if (key == null) { - throw new IllegalArgumentException("The key must be specified"); - } - - if (!isValidKeyLength(key)) { - throw new IllegalArgumentException("The key must be of length [" + StringUtils.join(VALID_KEY_LENGTHS, ", ") + "]"); - } - - Cipher cipher = Cipher.getInstance(algorithm, provider); - final String operation = encryptMode ? "encrypt" : "decrypt"; - - boolean ivIsInvalid = false; - - // If an IV was not provided already, generate a random IV and inject it in the cipher - int ivLength = cipher.getBlockSize(); - if (iv.length != ivLength) { - logger.warn("An IV was provided of length {} bytes for {}ion but should be {} bytes", iv.length, operation, ivLength); - ivIsInvalid = true; - } - - final byte[] emptyIv = new byte[ivLength]; - if (Arrays.equals(iv, emptyIv)) { - logger.warn("An empty IV was provided of length {} for {}ion", iv.length, operation); - ivIsInvalid = true; - } - - if (ivIsInvalid) { - if (encryptMode) { - logger.warn("Generating new IV. The value can be obtained in the calling code by invoking 'cipher.getIV()';"); - iv = generateIV(); - } else { - // Can't decrypt without an IV - throw new IllegalArgumentException("Cannot decrypt without a valid IV"); - } - } - cipher.init(encryptMode ? Cipher.ENCRYPT_MODE : Cipher.DECRYPT_MODE, key, new IvParameterSpec(iv)); - - return cipher; - } - - private boolean isValidKeyLength(SecretKey key) { - return VALID_KEY_LENGTHS.contains(key.getEncoded().length * 8); - } - - /** - * Generates a new random IV of 16 bytes using {@link java.security.SecureRandom}. - * - * @return the IV - */ - public byte[] generateIV() { - byte[] iv = new byte[IV_LENGTH]; - new SecureRandom().nextBytes(iv); - return iv; - } -}
