This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new c19db9d NIFI-7375: This closes #4218. Fixed a bug that caused Provenance Events not to show up in specific situations when clicking View Provenance for a Processor. - Added System-level tests for Provenance repository to reproduce behavior. - Added a Provenance Client to the CLI, which is necessary for System-level tests. - Added small additional configuration for Provenance repository to simplify development of system tests - Minor improvements to system tests (such as ability [...] c19db9d is described below commit c19db9d623455b157d59c75c49f4ba0726693b43 Author: Mark Payne <marka...@hotmail.com> AuthorDate: Wed Apr 15 12:24:46 2020 -0400 NIFI-7375: This closes #4218. Fixed a bug that caused Provenance Events not to show up in specific situations when clicking View Provenance for a Processor. - Added System-level tests for Provenance repository to reproduce behavior. - Added a Provenance Client to the CLI, which is necessary for System-level tests. - Added small additional configuration for Provenance repository to simplify development of system tests - Minor improvements to system tests (such as ability to destroy environment between tests) needed for Provenance repository based system tests Signed-off-by: Joe Witt <joew...@apache.org> --- .../nifi/provenance/StandardQueryResult.java | 10 +- .../java/org/apache/nifi/util/NiFiProperties.java | 1 + .../nifi/provenance/RepositoryConfiguration.java | 18 +- .../lucene/LatestEventsPerProcessorQuery.java | 5 + .../provenance/index/lucene/LatestEventsQuery.java | 4 + .../provenance/index/lucene/LuceneEventIndex.java | 5 +- .../nifi/provenance/index/lucene/QueryTask.java | 12 +- .../provenance/store/PartitionedEventStore.java | 3 +- .../provenance/store/WriteAheadStorePartition.java | 27 ++- .../processors/tests/system/TerminateFlowFile.java | 35 ++++ .../services/org.apache.nifi.processor.Processor | 5 +- .../nifi/tests/system/InstanceConfiguration.java | 18 ++ .../apache/nifi/tests/system/NiFiClientUtil.java | 46 +++++ .../org/apache/nifi/tests/system/NiFiSystemIT.java | 21 ++- .../SpawnedStandaloneNiFiInstanceFactory.java | 18 ++ .../system/provenance/ProvenanceRepositoryIT.java | 205 +++++++++++++++++++++ .../test/resources/conf/default/nifi.properties | 1 + .../toolkit/cli/impl/client/NiFiClientFactory.java | 16 ++ .../toolkit/cli/impl/client/nifi/NiFiClient.java | 8 + .../cli/impl/client/nifi/ProvenanceClient.java | 36 ++++ .../impl/client/nifi/impl/JerseyNiFiClient.java | 18 ++ .../client/nifi/impl/JerseyProvenanceClient.java | 120 ++++++++++++ 22 files changed, 613 insertions(+), 19 deletions(-) diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java index 2777339..eab3a46 100644 --- a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java @@ -48,6 +48,7 @@ public class StandardQueryResult implements QueryResult, ProgressiveResult { private final Lock writeLock = rwLock.writeLock(); // guarded by writeLock private final SortedSet<ProvenanceEventRecord> matchingRecords = new TreeSet<>(new EventIdComparator()); + private long hitCount = 0L; private int numCompletedSteps = 0; private Date expirationDate; private String error; @@ -163,6 +164,7 @@ public class StandardQueryResult implements QueryResult, ProgressiveResult { } this.matchingRecords.addAll(newEvents); + hitCount += totalHits; // If we've added more records than the query's max, then remove the trailing elements. // We do this, rather than avoiding the addition of the elements because we want to choose @@ -188,10 +190,12 @@ public class StandardQueryResult implements QueryResult, ProgressiveResult { queryComplete = true; if (numCompletedSteps >= numSteps) { - logger.info("Completed {} comprised of {} steps in {} millis", query, numSteps, queryTime); + logger.info("Completed {} comprised of {} steps in {} millis. Index found {} hits. Read {} events from Event Files.", + query, numSteps, queryTime, hitCount, matchingRecords.size()); } else { - logger.info("Completed {} comprised of {} steps in {} millis (only completed {} steps because the maximum number of results was reached)", - query, numSteps, queryTime, numCompletedSteps); + logger.info("Completed {} comprised of {} steps in {} millis. Index found {} hits. Read {} events from Event Files. " + + "Only completed {} steps because the maximum number of results was reached.", + query, numSteps, queryTime, hitCount, matchingRecords.size(), numCompletedSteps); } } } finally { diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index 3fea5df..6bbc921 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -126,6 +126,7 @@ public abstract class NiFiProperties { public static final String PROVENANCE_MAX_STORAGE_SIZE = "nifi.provenance.repository.max.storage.size"; public static final String PROVENANCE_ROLLOVER_TIME = "nifi.provenance.repository.rollover.time"; public static final String PROVENANCE_ROLLOVER_SIZE = "nifi.provenance.repository.rollover.size"; + public static final String PROVENANCE_ROLLOVER_EVENT_COUNT = "nifi.provenance.repository.rollover.events"; public static final String PROVENANCE_QUERY_THREAD_POOL_SIZE = "nifi.provenance.repository.query.threads"; public static final String PROVENANCE_INDEX_THREAD_POOL_SIZE = "nifi.provenance.repository.index.threads"; public static final String PROVENANCE_COMPRESS_ON_ROLLOVER = "nifi.provenance.repository.compress.on.rollover"; diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java index b04bca7..53549aa 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java @@ -39,6 +39,7 @@ public class RepositoryConfiguration { public static final String CONCURRENT_MERGE_THREADS = "nifi.provenance.repository.concurrent.merge.threads"; public static final String WARM_CACHE_FREQUENCY = "nifi.provenance.repository.warm.cache.frequency"; + public static final String MAINTENACE_FREQUENCY = "nifi.provenance.repository.maintenance.frequency"; private final Map<String, File> storageDirectories = new LinkedHashMap<>(); private long recordLifeMillis = TimeUnit.MILLISECONDS.convert(24, TimeUnit.HOURS); @@ -51,6 +52,7 @@ public class RepositoryConfiguration { private int compressionBlockBytes = 1024 * 1024; private int maxAttributeChars = 65536; private int debugFrequency = 1_000_000; + private long maintenanceFrequencyMillis = TimeUnit.MINUTES.toMillis(1L); // TODO: Delegaate to RepositoryEncryptionConfiguration in NIFI-6617 private Map<String, String> encryptionKeys; @@ -416,6 +418,14 @@ public class RepositoryConfiguration { this.debugFrequency = debugFrequency; } + public long getMaintenanceFrequency(final TimeUnit timeUnit) { + return timeUnit.convert(maintenanceFrequencyMillis, TimeUnit.MILLISECONDS); + } + + public void setMaintenanceFrequency(final long period, final TimeUnit timeUnit) { + this.maintenanceFrequencyMillis = timeUnit.toMillis(period); + } + public static RepositoryConfiguration create(final NiFiProperties nifiProperties) { final Map<String, Path> storageDirectories = nifiProperties.getProvenanceRepositoryPaths(); @@ -426,13 +436,14 @@ public class RepositoryConfiguration { final String storageSize = nifiProperties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE, "1 GB"); final String rolloverTime = nifiProperties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_TIME, "5 mins"); final String rolloverSize = nifiProperties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB"); + final int rolloverEventCount = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_ROLLOVER_EVENT_COUNT, Integer.MAX_VALUE); final String shardSize = nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 MB"); final int queryThreads = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE, 2); final int indexThreads = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_INDEX_THREAD_POOL_SIZE, 2); final int journalCount = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16); final int concurrentMergeThreads = nifiProperties.getIntegerProperty(CONCURRENT_MERGE_THREADS, 2); final String warmCacheFrequency = nifiProperties.getProperty(WARM_CACHE_FREQUENCY); - + final String maintenanceFrequency = nifiProperties.getProperty(MAINTENACE_FREQUENCY); final long storageMillis = FormatUtils.getTimeDuration(storageTime, TimeUnit.MILLISECONDS); final long maxStorageBytes = DataUnit.parseDataSize(storageSize, DataUnit.B).longValue(); final long rolloverMillis = FormatUtils.getTimeDuration(rolloverTime, TimeUnit.MILLISECONDS); @@ -475,6 +486,7 @@ public class RepositoryConfiguration { config.setSearchableFields(searchableFields); config.setSearchableAttributes(searchableAttributes); config.setMaxEventFileCapacity(rolloverBytes); + config.setMaxEventFileCount(rolloverEventCount); config.setMaxEventFileLife(rolloverMillis, TimeUnit.MILLISECONDS); config.setMaxRecordLife(storageMillis, TimeUnit.MILLISECONDS); config.setMaxStorageCapacity(maxStorageBytes); @@ -490,6 +502,10 @@ public class RepositoryConfiguration { if (shardSize != null) { config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, DataUnit.B).longValue()); } + if (maintenanceFrequency != null && !maintenanceFrequency.trim().equals("")) { + final long millis = FormatUtils.getTimeDuration(maintenanceFrequency.trim(), TimeUnit.MILLISECONDS); + config.setMaintenanceFrequency(millis, TimeUnit.MILLISECONDS); + } config.setAlwaysSync(alwaysSync); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java index 73b0a14..1c09d57 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java @@ -74,4 +74,9 @@ public class LatestEventsPerProcessorQuery implements CachedQuery { return Optional.of(eventIds); } + @Override + public String toString() { + return "Latest Events Per Processor"; + } + } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsQuery.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsQuery.java index 94cd013..6002521 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsQuery.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsQuery.java @@ -52,4 +52,8 @@ public class LatestEventsQuery implements CachedQuery { } } + @Override + public String toString() { + return "Most Recent Events Query"; + } } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java index f4b96c6..f8cf709 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java @@ -606,11 +606,14 @@ public class LuceneEventIndex implements EventIndex { querySubmissionMap.put(query.getIdentifier(), submission); final List<Long> eventIds = eventIdListOption.get(); + logger.debug("Cached Query {} produced {} Event IDs for {}: {}", cachedQuery, eventIds.size(), query, eventIds); queryExecutor.submit(() -> { List<ProvenanceEventRecord> events; try { events = eventStore.getEvents(eventIds, authorizer, EventTransformer.EMPTY_TRANSFORMER); + logger.debug("Retrieved {} of {} Events from Event Store", events.size(), eventIds.size()); + submission.getResult().update(events, eventIds.size()); } catch (final Exception e) { submission.getResult().setError("Failed to retrieve Provenance Events from store; see logs for more details"); @@ -639,7 +642,7 @@ public class LuceneEventIndex implements EventIndex { querySubmissionMap.put(query.getIdentifier(), submission); final org.apache.lucene.search.Query luceneQuery = LuceneUtil.convertQuery(query); - logger.debug("Submitting query {} with identifier {} against index directories {}", luceneQuery, query.getIdentifier(), indexDirectories); + logger.debug("Submitting query {} with identifier {} against {} index directories: {}", luceneQuery, query.getIdentifier(), indexDirectories.size(), indexDirectories); if (indexDirectories.isEmpty()) { submission.getResult().update(Collections.emptyList(), 0L); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/QueryTask.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/QueryTask.java index 13c0367..368a19d 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/QueryTask.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/QueryTask.java @@ -19,6 +19,8 @@ package org.apache.nifi.provenance.index.lucene; import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.Query; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; import org.apache.lucene.search.TopDocs; import org.apache.nifi.provenance.ProgressiveResult; import org.apache.nifi.provenance.ProvenanceEventRecord; @@ -105,7 +107,7 @@ public class QueryTask implements Runnable { try { final long borrowMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - borrowStart); - logger.debug("Borrowing index searcher for {} took {} ms", indexDir, borrowMillis); + logger.trace("Borrowing index searcher for {} took {} ms", indexDir, borrowMillis); final long startNanos = System.nanoTime(); // If max number of results are retrieved, do not bother querying lucene @@ -124,7 +126,11 @@ public class QueryTask implements Runnable { final IndexReader indexReader = searcher.getIndexSearcher().getIndexReader(); final TopDocs topDocs; try { - topDocs = searcher.getIndexSearcher().search(query, maxResults); + + // Sort based on document id, descending. This gives us most recent events first. + final Sort sort = new Sort(new SortField(null, SortField.Type.DOC, true)); + + topDocs = searcher.getIndexSearcher().search(query, maxResults, sort); } catch (final Exception e) { logger.error("Failed to query Lucene for index " + indexDir, e); queryResult.setError("Failed to query Lucene for index " + indexDir + " due to " + e); @@ -189,7 +195,7 @@ public class QueryTask implements Runnable { final long endConvert = System.nanoTime(); final long ms = TimeUnit.NANOSECONDS.toMillis(endConvert - start); - logger.debug("Converting documents took {} ms", ms); + logger.trace("Converting documents took {} ms", ms); List<ProvenanceEventRecord> events; try { diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedEventStore.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedEventStore.java index 782e7ab..a320f5a 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedEventStore.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedEventStore.java @@ -62,7 +62,8 @@ public abstract class PartitionedEventStore implements EventStore { @Override public void initialize() throws IOException { maintenanceExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Provenance Repository Maintenance")); - maintenanceExecutor.scheduleWithFixedDelay(() -> performMaintenance(), 1, 1, TimeUnit.MINUTES); + final long maintenanceMillis = repoConfig.getMaintenanceFrequency(TimeUnit.MILLISECONDS); + maintenanceExecutor.scheduleWithFixedDelay(this::performMaintenance, maintenanceMillis, maintenanceMillis, TimeUnit.MILLISECONDS); for (final EventStorePartition partition : getPartitions()) { partition.initialize(); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java index 68d2b3d..9d805f9 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java @@ -175,7 +175,7 @@ public class WriteAheadStorePartition implements EventStorePartition { } // Claim a Record Writer Lease so that we have a writer to persist the events to - RecordWriterLease lease = null; + RecordWriterLease lease; while (true) { lease = getLease(); if (lease.tryClaim()) { @@ -185,7 +185,6 @@ public class WriteAheadStorePartition implements EventStorePartition { final RolloverState rolloverState = lease.getRolloverState(); if (rolloverState.isRollover()) { final boolean success = tryRollover(lease); - if (success) { logger.info("Successfully rolled over Event Writer for {} due to {}", this, rolloverState); } @@ -476,9 +475,16 @@ public class WriteAheadStorePartition implements EventStorePartition { public void purgeOldEvents(final long olderThan, final TimeUnit unit) { final long timeCutoff = System.currentTimeMillis() - unit.toMillis(olderThan); - getEventFilesFromDisk().filter(file -> file.lastModified() < timeCutoff) + final List<File> removed = getEventFilesFromDisk().filter(file -> file.lastModified() < timeCutoff) .sorted(DirectoryUtils.SMALLEST_ID_FIRST) - .forEach(this::delete); + .filter(this::delete) + .collect(Collectors.toList()); + + if (removed.isEmpty()) { + logger.debug("No Provenance Event files that exceed time-based threshold of {} {}", olderThan, unit); + } else { + logger.info("Purged {} Provenance Event files from Provenance Repository because the events were older than {} {}: {}", removed.size(), olderThan, unit, removed); + } } private File getActiveEventFile() { @@ -489,20 +495,27 @@ public class WriteAheadStorePartition implements EventStorePartition { @Override public long purgeOldestEvents() { final List<File> eventFiles = getEventFilesFromDisk().sorted(DirectoryUtils.SMALLEST_ID_FIRST).collect(Collectors.toList()); - if (eventFiles.isEmpty()) { + if (eventFiles.size() < 2) { + // If there are no Event Files, there's nothing to do. If there is exactly 1 Event File, it means that the only Event File + // that exists is the Active Event File, which we are writing to, so we don't want to remove it either. return 0L; } final File currentFile = getActiveEventFile(); + if (currentFile == null) { + logger.debug("There is currently no Active Event File for {}. Will not purge oldest events until the Active Event File has been established.", this); + return 0L; + } + for (final File eventFile : eventFiles) { if (eventFile.equals(currentFile)) { - continue; + break; } final long fileSize = eventFile.length(); if (delete(eventFile)) { - logger.debug("{} Deleted {} event file ({}) due to storage limits", this, eventFile, FormatUtils.formatDataSize(fileSize)); + logger.info("{} Deleted {} event file ({}) due to storage limits", this, eventFile, FormatUtils.formatDataSize(fileSize)); return fileSize; } else { logger.warn("{} Failed to delete oldest event file {}. This file should be cleaned up manually.", this, eventFile); diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/TerminateFlowFile.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/TerminateFlowFile.java new file mode 100644 index 0000000..734766f --- /dev/null +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/TerminateFlowFile.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.tests.system; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +public class TerminateFlowFile extends AbstractProcessor { + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + session.remove(flowFile); + } +} diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 003c917..6d30e3c 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -14,8 +14,9 @@ # limitations under the License. org.apache.nifi.processors.tests.system.CountEvents +org.apache.nifi.processors.tests.system.FakeProcessor +org.apache.nifi.processors.tests.system.FakeDynamicPropertiesProcessor org.apache.nifi.processors.tests.system.GenerateFlowFile org.apache.nifi.processors.tests.system.Sleep +org.apache.nifi.processors.tests.system.TerminateFlowFile org.apache.nifi.processors.tests.system.ValidateFileExists -org.apache.nifi.processors.tests.system.FakeProcessor -org.apache.nifi.processors.tests.system.FakeDynamicPropertiesProcessor \ No newline at end of file diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/InstanceConfiguration.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/InstanceConfiguration.java index 85a839c..28dc2ec 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/InstanceConfiguration.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/InstanceConfiguration.java @@ -18,6 +18,8 @@ package org.apache.nifi.tests.system; import java.io.File; import java.io.FileNotFoundException; +import java.util.HashMap; +import java.util.Map; public class InstanceConfiguration { private final File bootstrapConfigFile; @@ -25,6 +27,7 @@ public class InstanceConfiguration { private final File flowXmlGz; private final File stateDirectory; private final boolean autoStart; + private final Map<String, String> nifiPropertiesOverrides; private InstanceConfiguration(Builder builder) { this.bootstrapConfigFile = builder.bootstrapConfigFile; @@ -32,6 +35,7 @@ public class InstanceConfiguration { this.flowXmlGz = builder.flowXmlGz; this.stateDirectory = builder.stateDirectory; this.autoStart = builder.autoStart; + this.nifiPropertiesOverrides = builder.nifiPropertiesOverrides; } public File getBootstrapConfigFile() { @@ -54,12 +58,26 @@ public class InstanceConfiguration { return autoStart; } + public Map<String, String> getNifiPropertiesOverrides() { + return nifiPropertiesOverrides; + } + public static class Builder { private File bootstrapConfigFile; private File instanceDirectory; private File flowXmlGz; private File stateDirectory; private boolean autoStart = true; + private final Map<String, String> nifiPropertiesOverrides = new HashMap<>(); + + public Builder overrideNifiProperties(final Map<String, String> overrides) { + nifiPropertiesOverrides.clear(); + if (overrides != null) { + nifiPropertiesOverrides.putAll(overrides); + } + + return this; + } public Builder bootstrapConfig(final File configFile) { if (!configFile.exists()) { diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java index bb7b54b..a9806a0 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java @@ -20,6 +20,7 @@ import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.controller.queue.LoadBalanceCompression; import org.apache.nifi.controller.queue.LoadBalanceStrategy; import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.provenance.search.SearchableField; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.toolkit.cli.impl.client.nifi.ConnectionClient; @@ -47,6 +48,8 @@ import org.apache.nifi.web.api.dto.VariableDTO; import org.apache.nifi.web.api.dto.VariableRegistryDTO; import org.apache.nifi.web.api.dto.flow.FlowDTO; import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO; +import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO; +import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO; import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO; import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity; import org.apache.nifi.web.api.entity.ConnectionEntity; @@ -67,6 +70,7 @@ import org.apache.nifi.web.api.entity.PortEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.apache.nifi.web.api.entity.ProvenanceEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; import org.apache.nifi.web.api.entity.ScheduleComponentsEntity; import org.apache.nifi.web.api.entity.VariableEntity; @@ -80,6 +84,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -772,4 +777,45 @@ public class NiFiClientUtil { final ProcessGroupEntity childGroup = nifiClient.getProcessGroupClient().createProcessGroup(parentGroupId, childGroupEntity); return childGroup; } + + public ProvenanceEntity queryProvenance(final Map<SearchableField, String> searchTerms, final Long startTime, final Long endTime) throws NiFiClientException, IOException { + final Map<String, String> searchTermsAsStrings = searchTerms.entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey().getSearchableFieldName(), Map.Entry::getValue)); + + final ProvenanceRequestDTO requestDto = new ProvenanceRequestDTO(); + requestDto.setSearchTerms(searchTermsAsStrings); + requestDto.setSummarize(false); + requestDto.setStartDate(startTime == null ? null : new Date(startTime)); + requestDto.setEndDate(endTime == null ? null : new Date(endTime)); + requestDto.setMaxResults(1000); + + final ProvenanceDTO dto = new ProvenanceDTO(); + dto.setRequest(requestDto); + dto.setSubmissionTime(new Date()); + + final ProvenanceEntity entity = new ProvenanceEntity(); + entity.setProvenance(dto); + + ProvenanceEntity responseEntity = nifiClient.getProvenanceClient().submitProvenanceQuery(entity); + + try { + responseEntity = waitForComplete(responseEntity); + } catch (final InterruptedException ie) { + Assert.fail("Interrupted while waiting for Provenance Query to complete"); + } + + nifiClient.getProvenanceClient().deleteProvenanceQuery(responseEntity.getProvenance().getId()); + return responseEntity; + } + + public ProvenanceEntity waitForComplete(final ProvenanceEntity entity) throws InterruptedException, NiFiClientException, IOException { + ProvenanceEntity current = entity; + while (current.getProvenance().isFinished() != Boolean.TRUE) { + Thread.sleep(100L); + current = nifiClient.getProvenanceClient().getProvenanceQuery(entity.getProvenance().getId()); + } + + return current; + } + } diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java index 04a376b..1e37b01 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java @@ -33,6 +33,8 @@ import org.junit.rules.Timeout; import java.io.File; import java.io.IOException; +import java.util.Collections; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BooleanSupplier; @@ -98,6 +100,10 @@ public abstract class NiFiSystemIT { if (isDestroyFlowAfterEachTest()) { destroyFlow(); } + + if (isDestroyEnvironmentAfterEachTest()) { + cleanup(); + } } finally { if (nifiClient != null) { nifiClient.close(); @@ -105,6 +111,10 @@ public abstract class NiFiSystemIT { } } + protected boolean isDestroyEnvironmentAfterEachTest() { + return false; + } + protected void destroyFlow() throws NiFiClientException, IOException { getClientUtil().stopProcessGroupComponents("root"); getClientUtil().disableControllerServices("root"); @@ -229,16 +239,25 @@ public abstract class NiFiSystemIT { new InstanceConfiguration.Builder() .bootstrapConfig("src/test/resources/conf/default/bootstrap.conf") .instanceDirectory("target/standalone-instance") + .overrideNifiProperties(getNifiPropertiesOverrides()) .build()); } + protected Map<String, String> getNifiPropertiesOverrides() { + return Collections.emptyMap(); + } + protected boolean isDestroyFlowAfterEachTest() { return true; } protected void waitFor(final BooleanSupplier condition) throws InterruptedException { + waitFor(condition, 10L); + } + + protected void waitFor(final BooleanSupplier condition, final long delayMillis) throws InterruptedException { while (!condition.getAsBoolean()) { - Thread.sleep(10L); + Thread.sleep(delayMillis); } } diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java index 081eb1e..c19a755 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java @@ -133,6 +133,24 @@ public class SpawnedStandaloneNiFiInstanceFactory implements NiFiInstanceFactory final File destinationFlowXmlGz = new File(destinationConf, "flow.xml.gz"); Files.copy(flowXmlGz.toPath(), destinationFlowXmlGz.toPath()); } + + // Write out any Property overrides + final Map<String, String> nifiPropertiesOverrides = instanceConfiguration.getNifiPropertiesOverrides(); + if (nifiPropertiesOverrides != null && !nifiPropertiesOverrides.isEmpty()) { + final File destinationNifiProperties = new File(destinationConf, "nifi.properties"); + final File sourceNifiProperties = new File(bootstrapConfigFile.getParentFile(), "nifi.properties"); + + final Properties nifiProperties = new Properties(); + try (final InputStream fis = new FileInputStream(sourceNifiProperties)) { + nifiProperties.load(fis); + } + + nifiPropertiesOverrides.forEach(nifiProperties::setProperty); + + try (final OutputStream fos = new FileOutputStream(destinationNifiProperties)) { + nifiProperties.store(fos, null); + } + } } private void copyContents(final File dir, final File destinationDir) throws IOException { diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ProvenanceRepositoryIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ProvenanceRepositoryIT.java new file mode 100644 index 0000000..2a5481d --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ProvenanceRepositoryIT.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.tests.system.provenance; + +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.SearchableFields; +import org.apache.nifi.provenance.search.SearchableField; +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.apache.nifi.web.api.entity.ProvenanceEntity; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +import static org.junit.Assert.assertEquals; + +public class ProvenanceRepositoryIT extends NiFiSystemIT { + + @Override + protected Map<String, String> getNifiPropertiesOverrides() { + final Map<String, String> properties = new HashMap<>(); + + // Force only a single Provenance Event File to exist + properties.put("nifi.provenance.repository.max.storage.size", "1 KB"); + + // Perform maintenance every 2 seconds to ensure that we don't have to wait a long time for old event files to roll off. + properties.put("nifi.provenance.repository.maintenance.frequency", "2 secs"); + + return properties; + } + + @Override + protected boolean isDestroyEnvironmentAfterEachTest() { + // We need to destroy entire environment after each test to ensure that the repositories are destroyed. + // This is important because we are expecting exact numbers of events in the repo. + return true; + } + + @Test + public void testSimpleQueryByComponentID() throws NiFiClientException, IOException, InterruptedException { + final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile"); + final ProcessorEntity count = getClientUtil().createProcessor("CountEvents"); + getClientUtil().setAutoTerminatedRelationships(count, "success"); + getClientUtil().createConnection(generateFlowFile, count, "success"); + + getNifiClient().getProcessorClient().startProcessor(generateFlowFile); + getNifiClient().getProcessorClient().startProcessor(count); + + final Map<SearchableField, String> searchTerms = Collections.singletonMap(SearchableFields.ComponentID, generateFlowFile.getId()); + ProvenanceEntity provenanceEntity = getClientUtil().queryProvenance(searchTerms, null, null); + assertEquals(0, provenanceEntity.getProvenance().getResults().getProvenanceEvents().size()); + + // Wait for there to be at least 1 event. + waitForEventCountAtLeast(searchTerms, 1); + + provenanceEntity = getClientUtil().queryProvenance(searchTerms, null, null); + + final List<ProvenanceEventDTO> events = provenanceEntity.getProvenance().getResults().getProvenanceEvents(); + assertEquals(1, events.size()); + + final ProvenanceEventDTO firstEvent = events.get(0); + assertEquals(ProvenanceEventType.CREATE.name(), firstEvent.getEventType()); + } + + + + // If we add some events for Component ABC and then they age off, we should be able to query and get back 0 results. + // If we then add some more events for Component ABC and query, we should see those new events. Even if we have aged off + // 1000+ events (1000 = max results of the provenance query). This should be true whether NiFi is restarted in between or not. + // To ensure this, we have two tests that are very similar but one restarts NiFi in between and one does not. + // This test does not restart NiFi. + @Test + public void testAgeOffEventsThenAddSomeThenQuery() throws NiFiClientException, IOException, InterruptedException { + ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile"); + generateFlowFile = getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Batch Size", "800")); + + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); + getClientUtil().setAutoTerminatedRelationships(terminate, "success"); + getClientUtil().createConnection(generateFlowFile, terminate, "success"); + + generateFlowFile = getNifiClient().getProcessorClient().startProcessor(generateFlowFile); + + final Map<SearchableField, String> generateSearchTerms = Collections.singletonMap(SearchableFields.ComponentID, generateFlowFile.getId()); + + // Wait for there to be at least 1000 events for Generate processor and then stop the processor + waitForEventCountAtLeast(generateSearchTerms, 800); + getNifiClient().getProcessorClient().stopProcessor(generateFlowFile); + + // Start Terminate proc & wait for at least 600 events to be registered. We do this because each Event File can hold up to 1,000 Events. + // The GenerateFlowFile would have 800. The first 200 events from Terminate will be in the first Event File, causing that one to + // roll over and subsequently be aged off. The second Event File will hold the other 600. So we may have 600 or 800 events, + // depending on when the query is executed. + getNifiClient().getProcessorClient().startProcessor(terminate); + final Map<SearchableField, String> terminateSearchTerms = Collections.singletonMap(SearchableFields.ComponentID, terminate.getId()); + waitForEventCountAtLeast(terminateSearchTerms, 600); + + waitForEventCountExactly(generateSearchTerms, 0); + + // Emit 25 more events + getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Batch Size", "25")); + getNifiClient().getProcessorClient().startProcessor(generateFlowFile); + + // Wait for those 25 events to be emitted + waitForEventCountAtLeast(generateSearchTerms, 25); + } + + + // If we add some events for Component ABC and then they age off, we should be able to query and get back 0 results. + // If we then add some more events for Component ABC and query, we should see those new events. Even if we have aged off + // 1000+ events (1000 = max results of the provenance query). This should be true whether NiFi is restarted in between or not. + // To ensure this, we have two tests that are very similar but one restarts NiFi in between and one does not. + // This test does restart NiFi. + @Test + public void testAgeOffEventsThenRestartAddSomeThenQuery() throws NiFiClientException, IOException, InterruptedException { + ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile"); + generateFlowFile = getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Batch Size", "800")); + + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); + getClientUtil().setAutoTerminatedRelationships(terminate, "success"); + getClientUtil().createConnection(generateFlowFile, terminate, "success"); + + generateFlowFile = getNifiClient().getProcessorClient().startProcessor(generateFlowFile); + + final Map<SearchableField, String> generateSearchTerms = Collections.singletonMap(SearchableFields.ComponentID, generateFlowFile.getId()); + + // Wait for there to be at least 800 events for Generate processor and then stop it + waitForEventCountAtLeast(generateSearchTerms, 800); + getNifiClient().getProcessorClient().stopProcessor(generateFlowFile); + + // Start Terminate proc & wait for at least 600 events to be registered. We do this because each Event File can hold up to 1,000 Events. + // The GenerateFlowFile would have 800. The first 200 events from Terminate will be in the first Event File, causing that one to + // roll over and subsequently be aged off. The second Event File will hold the other 600. So we may have 600 or 800 events, + // depending on when the query is executed. + getNifiClient().getProcessorClient().startProcessor(terminate); + final Map<SearchableField, String> terminateSearchTerms = Collections.singletonMap(SearchableFields.ComponentID, terminate.getId()); + waitForEventCountAtLeast(terminateSearchTerms, 600); + getNifiClient().getProcessorClient().stopProcessor(terminate); + + waitForEventCountExactly(generateSearchTerms, 0); + + // Restart NiFi. We do this so that when we query provenance for the Processor we won't be able to use the "Cached" events + // and will instead have to query Lucene + getNiFiInstance().stop(); + getNiFiInstance().start(); + + // Ensure that Terminate processor is stopped, since nifi could have shutdown before persisting flow.xml.gz + terminate.getRevision().setVersion(0L); // Reset the revision + getNifiClient().getProcessorClient().stopProcessor(terminate); + getClientUtil().waitForStoppedProcessor(terminate.getId()); + + // Emit 400 more events + generateFlowFile.getRevision().setVersion(0L); // Reset the revision + getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Batch Size", "400")); + getNifiClient().getProcessorClient().startProcessor(generateFlowFile); + + // Since we restarted, the previous Event File will be rolled over. And since it will be > 1 KB in size, it will age off almost immediately. + // This will leave us with only the 400 newly created events. + waitForEventCountExactly(generateSearchTerms, 400); + } + + private void waitForEventCountExactly(final Map<SearchableField, String> searchTerms, final int expectedCount) throws InterruptedException { + waitForEventCount(searchTerms, count -> count == expectedCount); + } + + private void waitForEventCountAtLeast(final Map<SearchableField, String> searchTerms, final int expectedCount) throws InterruptedException { + waitForEventCount(searchTerms, count -> count >= expectedCount); + } + + private void waitForEventCount(final Map<SearchableField, String> searchTerms, final Predicate<Integer> predicate) throws InterruptedException { + // Wait for there to be at least 1000 events for Generate processor + waitFor(() -> { + try { + return predicate.test(getEventCount(searchTerms)); + } catch (final Exception e) { + return false; + } + }, 500L); + } + + private int getEventCount(final Map<SearchableField, String> searchTerms) throws NiFiClientException, IOException { + ProvenanceEntity provEntity = getClientUtil().queryProvenance(searchTerms, null, null); + return provEntity.getProvenance().getResults().getProvenanceEvents().size(); + } +} diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties index 0b4aa9d..8a37b68 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties +++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties @@ -96,6 +96,7 @@ nifi.provenance.repository.directory.default=./provenance_repository nifi.provenance.repository.max.storage.time=24 hours nifi.provenance.repository.max.storage.size=1 GB nifi.provenance.repository.rollover.time=30 secs +nifi.provenance.repository.rollover.events=1000 nifi.provenance.repository.rollover.size=100 MB nifi.provenance.repository.query.threads=2 nifi.provenance.repository.index.threads=2 diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/NiFiClientFactory.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/NiFiClientFactory.java index 499157c..804a60d 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/NiFiClientFactory.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/NiFiClientFactory.java @@ -33,6 +33,7 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.ParamContextClient; import org.apache.nifi.toolkit.cli.impl.client.nifi.PoliciesClient; import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessGroupClient; import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.ProvenanceClient; import org.apache.nifi.toolkit.cli.impl.client.nifi.RemoteProcessGroupClient; import org.apache.nifi.toolkit.cli.impl.client.nifi.ReportingTasksClient; import org.apache.nifi.toolkit.cli.impl.client.nifi.TemplatesClient; @@ -373,6 +374,21 @@ public class NiFiClientFactory implements ClientFactory<NiFiClient> { } @Override + public ProvenanceClient getProvenanceClient() { + return wrappedClient.getProvenanceClient(); + } + + @Override + public ProvenanceClient getProvenanceClientForProxiedEntities(final String... proxiedEntity) { + return wrappedClient.getProvenanceClientForProxiedEntities(proxiedEntity); + } + + @Override + public ProvenanceClient getProvenanceClientForToken(final String token) { + return wrappedClient.getProvenanceClientForToken(token); + } + + @Override public void close() throws IOException { wrappedClient.close(); } diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/NiFiClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/NiFiClient.java index 3c66fe3..1a7e7c2 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/NiFiClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/NiFiClient.java @@ -184,6 +184,14 @@ public interface NiFiClient extends Closeable { OutputPortClient getOutputPortClientForToken(String token); + // ----- ProvenanceClient ----- + + ProvenanceClient getProvenanceClient(); + + ProvenanceClient getProvenanceClientForProxiedEntities(String... proxiedEntity); + + ProvenanceClient getProvenanceClientForToken(String token); + /** * The builder interface that implementations should provide for obtaining the client. diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProvenanceClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProvenanceClient.java new file mode 100644 index 0000000..60774c9 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProvenanceClient.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.client.nifi; + +import org.apache.nifi.web.api.entity.LineageEntity; +import org.apache.nifi.web.api.entity.ProvenanceEntity; + +import java.io.IOException; + +public interface ProvenanceClient { + ProvenanceEntity submitProvenanceQuery(ProvenanceEntity provenanceQuery) throws NiFiClientException, IOException; + + ProvenanceEntity getProvenanceQuery(String queryId) throws NiFiClientException, IOException; + + ProvenanceEntity deleteProvenanceQuery(String queryId) throws NiFiClientException, IOException; + + LineageEntity submitLineageRequest(LineageEntity lineageEntity) throws NiFiClientException, IOException; + + LineageEntity getLineageRequest(String lineageRequestId) throws NiFiClientException, IOException; + + LineageEntity deleteLineageRequest(String lineageRequestId) throws NiFiClientException, IOException; +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyNiFiClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyNiFiClient.java index fbc71e6..e6b1014 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyNiFiClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyNiFiClient.java @@ -35,6 +35,7 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.ParamContextClient; import org.apache.nifi.toolkit.cli.impl.client.nifi.PoliciesClient; import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessGroupClient; import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.ProvenanceClient; import org.apache.nifi.toolkit.cli.impl.client.nifi.RemoteProcessGroupClient; import org.apache.nifi.toolkit.cli.impl.client.nifi.ReportingTasksClient; import org.apache.nifi.toolkit.cli.impl.client.nifi.TemplatesClient; @@ -393,6 +394,23 @@ public class JerseyNiFiClient implements NiFiClient { } @Override + public ProvenanceClient getProvenanceClient() { + return new JerseyProvenanceClient(baseTarget); + } + + @Override + public ProvenanceClient getProvenanceClientForProxiedEntities(final String... proxiedEntity) { + final Map<String, String> headers = getHeaders(proxiedEntity); + return new JerseyProvenanceClient(baseTarget, headers); + } + + @Override + public ProvenanceClient getProvenanceClientForToken(final String token) { + final Map<String, String> headers = getHeadersWithToken(token); + return new JerseyProvenanceClient(baseTarget, headers); + } + + @Override public void close() { if (this.client != null) { try { diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProvenanceClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProvenanceClient.java new file mode 100644 index 0000000..1cbdad3 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProvenanceClient.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.client.nifi.impl; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.toolkit.cli.impl.client.nifi.ProvenanceClient; +import org.apache.nifi.web.api.entity.LineageEntity; +import org.apache.nifi.web.api.entity.ProvenanceEntity; + +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.MediaType; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +public class JerseyProvenanceClient extends AbstractJerseyClient implements ProvenanceClient { + private final WebTarget provenanceTarget; + + public JerseyProvenanceClient(final WebTarget baseTarget) { + this(baseTarget, Collections.emptyMap()); + } + + public JerseyProvenanceClient(final WebTarget baseTarget, final Map<String, String> headers) { + super(headers); + this.provenanceTarget = baseTarget.path("/provenance"); + } + + @Override + public ProvenanceEntity submitProvenanceQuery(final ProvenanceEntity provenanceEntity) throws NiFiClientException, IOException { + if (provenanceEntity == null) { + throw new IllegalArgumentException("Provenance entity cannot be null"); + } + + return executeAction("Error submitting Provenance Query", () -> getRequestBuilder(provenanceTarget).post( + Entity.entity(provenanceEntity, MediaType.APPLICATION_JSON_TYPE), + ProvenanceEntity.class + )); + } + + @Override + public ProvenanceEntity getProvenanceQuery(final String queryId) throws NiFiClientException, IOException { + if (StringUtils.isBlank(queryId)) { + throw new IllegalArgumentException("Query ID cannot be null"); + } + + return executeAction("Error retrieving status of Provenance Query", () -> { + final WebTarget target = provenanceTarget.path("/{id}").resolveTemplate("id", queryId); + return getRequestBuilder(target).get(ProvenanceEntity.class); + }); + } + + @Override + public ProvenanceEntity deleteProvenanceQuery(final String provenanceQueryId) throws NiFiClientException, IOException { + if (provenanceQueryId == null) { + throw new IllegalArgumentException("Provenance Query ID cannot be null"); + } + + return executeAction("Error deleting Provenance Query", () -> { + final WebTarget target = provenanceTarget.path("/{id}") + .resolveTemplate("id", provenanceQueryId); + + return getRequestBuilder(target).delete(ProvenanceEntity.class); + }); + } + + @Override + public LineageEntity submitLineageRequest(final LineageEntity lineageEntity) throws NiFiClientException, IOException { + if (lineageEntity == null) { + throw new IllegalArgumentException("Lineage entity cannot be null"); + } + + return executeAction("Error submitting Provenance Lineage Request", () -> getRequestBuilder(provenanceTarget.path("lineage")).post( + Entity.entity(lineageEntity, MediaType.APPLICATION_JSON_TYPE), + LineageEntity.class + )); + } + + @Override + public LineageEntity getLineageRequest(final String lineageRequestId) throws NiFiClientException, IOException { + if (StringUtils.isBlank(lineageRequestId)) { + throw new IllegalArgumentException("Lineage Request ID cannot be null"); + } + + return executeAction("Error retrieving status of Provenance Lineage Request", () -> { + final WebTarget target = provenanceTarget.path("/lineage/{id}").resolveTemplate("id", lineageRequestId); + return getRequestBuilder(target).get(LineageEntity.class); + }); + } + + @Override + public LineageEntity deleteLineageRequest(final String lineageRequestId) throws NiFiClientException, IOException { + if (lineageRequestId == null) { + throw new IllegalArgumentException("Lineage Request ID cannot be null"); + } + + return executeAction("Error deleting Provenance Lineage Request", () -> { + final WebTarget target = provenanceTarget.path("/lineage/{id}") + .resolveTemplate("id", lineageRequestId); + + return getRequestBuilder(target).delete(LineageEntity.class); + }); + + } +}