http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java deleted file mode 100644 index 59d2308..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ /dev/null @@ -1,1096 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.PriorityQueue; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.nifi.connectable.Connection; -import org.apache.nifi.controller.repository.FlowFileRecord; -import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.FlowFilePrioritizer; -import org.apache.nifi.processor.DataUnit; -import org.apache.nifi.processor.FlowFileFilter; -import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult; -import org.apache.nifi.processor.QueueSize; -import org.apache.nifi.scheduling.SchedulingStrategy; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.concurrency.TimedLock; -import org.apache.nifi.util.timebuffer.LongEntityAccess; -import org.apache.nifi.util.timebuffer.TimedBuffer; -import org.apache.nifi.util.timebuffer.TimestampedLong; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A FlowFileQueue is used to queue FlowFile objects that are awaiting further - * processing. Must be thread safe. - * - * @author none - */ -public final class StandardFlowFileQueue implements FlowFileQueue { - - public static final int MAX_EXPIRED_RECORDS_PER_ITERATION = 100000; - public static final int SWAP_RECORD_POLL_SIZE = 10000; - - // When we have very high contention on a FlowFile Queue, the writeLock quickly becomes the bottleneck. In order to avoid this, - // we keep track of how often we are obtaining the write lock. If we exceed some threshold, we start performing a Pre-fetch so that - // we can then poll many times without having to obtain the lock. - // If lock obtained an average of more than PREFETCH_POLL_THRESHOLD times per second in order to poll from queue for last 5 seconds, do a pre-fetch. - public static final int PREFETCH_POLL_THRESHOLD = 1000; - public static final int PRIORITIZED_PREFETCH_SIZE = 10; - public static final int UNPRIORITIZED_PREFETCH_SIZE = 1000; - private volatile int prefetchSize = UNPRIORITIZED_PREFETCH_SIZE; // when we pre-fetch, how many should we pre-fetch? - - private static final Logger logger = LoggerFactory.getLogger(StandardFlowFileQueue.class); - - private PriorityQueue<FlowFileRecord> activeQueue = null; - private long activeQueueContentSize = 0L; - private ArrayList<FlowFileRecord> swapQueue = null; - - private int swappedRecordCount = 0; - private long swappedContentSize = 0L; - private String maximumQueueDataSize; - private long maximumQueueByteCount; - private boolean swapMode = false; - private long maximumQueueObjectCount; - - private final AtomicLong flowFileExpirationMillis; - private final Connection connection; - private final AtomicReference<String> flowFileExpirationPeriod; - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); - private final List<FlowFilePrioritizer> priorities; - private final int swapThreshold; - private final TimedLock readLock; - private final TimedLock writeLock; - private final String identifier; - - private final AtomicBoolean queueFullRef = new AtomicBoolean(false); - private final AtomicInteger activeQueueSizeRef = new AtomicInteger(0); - private final AtomicReference<QueueSize> unacknowledgedSizeRef = new AtomicReference<>(new QueueSize(0, 0L)); - - // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK! - private final ProcessScheduler scheduler; - - public StandardFlowFileQueue(final String identifier, final Connection connection, final ProcessScheduler scheduler, final int swapThreshold) { - activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>())); - priorities = new ArrayList<>(); - maximumQueueObjectCount = 0L; - maximumQueueDataSize = "0 MB"; - maximumQueueByteCount = 0L; - flowFileExpirationMillis = new AtomicLong(0); - flowFileExpirationPeriod = new AtomicReference<>("0 mins"); - swapQueue = new ArrayList<>(); - - this.identifier = identifier; - this.swapThreshold = swapThreshold; - this.scheduler = scheduler; - this.connection = connection; - - readLock = new TimedLock(this.lock.readLock(), identifier + " Read Lock", 100); - writeLock = new TimedLock(this.lock.writeLock(), identifier + " Write Lock", 100); - } - - @Override - public String getIdentifier() { - return identifier; - } - - @Override - public List<FlowFilePrioritizer> getPriorities() { - return Collections.unmodifiableList(priorities); - } - - @Override - public int getSwapThreshold() { - return swapThreshold; - } - - @Override - public void setPriorities(final List<FlowFilePrioritizer> newPriorities) { - writeLock.lock(); - try { - final PriorityQueue<FlowFileRecord> newQueue = new PriorityQueue<>(Math.max(20, activeQueue.size()), new Prioritizer(newPriorities)); - newQueue.addAll(activeQueue); - activeQueue = newQueue; - priorities.clear(); - priorities.addAll(newPriorities); - - if (newPriorities.isEmpty()) { - prefetchSize = UNPRIORITIZED_PREFETCH_SIZE; - } else { - prefetchSize = PRIORITIZED_PREFETCH_SIZE; - } - } finally { - writeLock.unlock("setPriorities"); - } - } - - @Override - public void setBackPressureObjectThreshold(final long maxQueueSize) { - writeLock.lock(); - try { - maximumQueueObjectCount = maxQueueSize; - this.queueFullRef.set(determineIfFull()); - } finally { - writeLock.unlock("setBackPressureObjectThreshold"); - } - } - - @Override - public long getBackPressureObjectThreshold() { - readLock.lock(); - try { - return maximumQueueObjectCount; - } finally { - readLock.unlock("getBackPressureObjectThreshold"); - } - } - - @Override - public void setBackPressureDataSizeThreshold(final String maxDataSize) { - writeLock.lock(); - try { - maximumQueueByteCount = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue(); - maximumQueueDataSize = maxDataSize; - this.queueFullRef.set(determineIfFull()); - } finally { - writeLock.unlock("setBackPressureDataSizeThreshold"); - } - } - - @Override - public String getBackPressureDataSizeThreshold() { - readLock.lock(); - try { - return maximumQueueDataSize; - } finally { - readLock.unlock("getBackPressureDataSizeThreshold"); - } - } - - @Override - public QueueSize size() { - readLock.lock(); - try { - return getQueueSize(); - } finally { - readLock.unlock("getSize"); - } - } - - /** - * MUST be called with lock held - * - * @return - */ - private QueueSize getQueueSize() { - final QueueSize unacknowledged = unacknowledgedSizeRef.get(); - final PreFetch preFetch = preFetchRef.get(); - - final int preFetchCount; - final long preFetchSize; - if (preFetch == null) { - preFetchCount = 0; - preFetchSize = 0L; - } else { - final QueueSize preFetchQueueSize = preFetch.size(); - preFetchCount = preFetchQueueSize.getObjectCount(); - preFetchSize = preFetchQueueSize.getByteCount(); - } - - return new QueueSize(activeQueue.size() + swappedRecordCount + unacknowledged.getObjectCount() + preFetchCount, - activeQueueContentSize + swappedContentSize + unacknowledged.getByteCount() + preFetchSize); - } - - @Override - public long contentSize() { - readLock.lock(); - try { - final PreFetch prefetch = preFetchRef.get(); - if (prefetch == null) { - return activeQueueContentSize + swappedContentSize + unacknowledgedSizeRef.get().getObjectCount(); - } else { - return activeQueueContentSize + swappedContentSize + unacknowledgedSizeRef.get().getObjectCount() + prefetch.size().getByteCount(); - } - } finally { - readLock.unlock("getContentSize"); - } - } - - @Override - public boolean isEmpty() { - readLock.lock(); - try { - final PreFetch prefetch = preFetchRef.get(); - if (prefetch == null) { - return activeQueue.isEmpty() && swappedRecordCount == 0 && unacknowledgedSizeRef.get().getObjectCount() == 0; - } else { - return activeQueue.isEmpty() && swappedRecordCount == 0 && unacknowledgedSizeRef.get().getObjectCount() == 0 && prefetch.size().getObjectCount() == 0; - } - } finally { - readLock.unlock("isEmpty"); - } - } - - @Override - public boolean isActiveQueueEmpty() { - final int activeQueueSize = activeQueueSizeRef.get(); - if (activeQueueSize == 0) { - final PreFetch preFetch = preFetchRef.get(); - if (preFetch == null) { - return true; - } - - final QueueSize queueSize = preFetch.size(); - return queueSize.getObjectCount() == 0; - } else { - return false; - } - } - - @Override - public QueueSize getActiveQueueSize() { - readLock.lock(); - try { - final PreFetch preFetch = preFetchRef.get(); - if (preFetch == null) { - return new QueueSize(activeQueue.size(), activeQueueContentSize); - } else { - final QueueSize preFetchSize = preFetch.size(); - return new QueueSize(activeQueue.size() + preFetchSize.getObjectCount(), activeQueueContentSize + preFetchSize.getByteCount()); - } - } finally { - readLock.unlock("getActiveQueueSize"); - } - } - - @Override - public void acknowledge(final FlowFileRecord flowFile) { - if (queueFullRef.get()) { - writeLock.lock(); - try { - updateUnacknowledgedSize(-1, -flowFile.getSize()); - queueFullRef.set(determineIfFull()); - } finally { - writeLock.unlock("acknowledge(FlowFileRecord)"); - } - } else { - updateUnacknowledgedSize(-1, -flowFile.getSize()); - } - - if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { - // queue was full but no longer is. Notify that the source may now be available to run, - // because of back pressure caused by this queue. - scheduler.registerEvent(connection.getSource()); - } - } - - @Override - public void acknowledge(final Collection<FlowFileRecord> flowFiles) { - long totalSize = 0L; - for (final FlowFileRecord flowFile : flowFiles) { - totalSize += flowFile.getSize(); - } - - if (queueFullRef.get()) { - writeLock.lock(); - try { - updateUnacknowledgedSize(-flowFiles.size(), -totalSize); - queueFullRef.set(determineIfFull()); - } finally { - writeLock.unlock("acknowledge(FlowFileRecord)"); - } - } else { - updateUnacknowledgedSize(-flowFiles.size(), -totalSize); - } - - if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { - // it's possible that queue was full but no longer is. Notify that the source may now be available to run, - // because of back pressure caused by this queue. - scheduler.registerEvent(connection.getSource()); - } - } - - @Override - public boolean isFull() { - return queueFullRef.get(); - } - - /** - * MUST be called with either the read or write lock held - * - * @return - */ - private boolean determineIfFull() { - final long maxSize = maximumQueueObjectCount; - final long maxBytes = maximumQueueByteCount; - if (maxSize <= 0 && maxBytes <= 0) { - return false; - } - - final QueueSize queueSize = getQueueSize(); - if (maxSize > 0 && queueSize.getObjectCount() >= maxSize) { - return true; - } - - if (maxBytes > 0 && (queueSize.getByteCount() >= maxBytes)) { - return true; - } - - return false; - } - - @Override - public void put(final FlowFileRecord file) { - writeLock.lock(); - try { - if (swapMode || activeQueue.size() >= swapThreshold) { - swapQueue.add(file); - swappedContentSize += file.getSize(); - swappedRecordCount++; - swapMode = true; - } else { - activeQueueContentSize += file.getSize(); - activeQueue.add(file); - } - - queueFullRef.set(determineIfFull()); - } finally { - activeQueueSizeRef.set(activeQueue.size()); - writeLock.unlock("put(FlowFileRecord)"); - } - - if (connection.getDestination().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { - scheduler.registerEvent(connection.getDestination()); - } - } - - @Override - public void putAll(final Collection<FlowFileRecord> files) { - final int numFiles = files.size(); - long bytes = 0L; - for (final FlowFile flowFile : files) { - bytes += flowFile.getSize(); - } - - writeLock.lock(); - try { - if (swapMode || activeQueue.size() >= swapThreshold - numFiles) { - swapQueue.addAll(files); - swappedContentSize += bytes; - swappedRecordCount += numFiles; - swapMode = true; - } else { - activeQueueContentSize += bytes; - activeQueue.addAll(files); - } - - queueFullRef.set(determineIfFull()); - } finally { - activeQueueSizeRef.set(activeQueue.size()); - writeLock.unlock("putAll"); - } - - if (connection.getDestination().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { - scheduler.registerEvent(connection.getDestination()); - } - } - - @Override - public List<FlowFileRecord> pollSwappableRecords() { - writeLock.lock(); - try { - if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) { - return null; - } - - final List<FlowFileRecord> swapRecords = new ArrayList<>(Math.min(SWAP_RECORD_POLL_SIZE, swapQueue.size())); - final Iterator<FlowFileRecord> itr = swapQueue.iterator(); - while (itr.hasNext() && swapRecords.size() < SWAP_RECORD_POLL_SIZE) { - FlowFileRecord record = itr.next(); - swapRecords.add(record); - itr.remove(); - } - - swapQueue.trimToSize(); - return swapRecords; - } finally { - writeLock.unlock("pollSwappableRecords"); - } - } - - @Override - public void putSwappedRecords(final Collection<FlowFileRecord> records) { - writeLock.lock(); - try { - try { - for (final FlowFileRecord record : records) { - swappedContentSize -= record.getSize(); - swappedRecordCount--; - activeQueueContentSize += record.getSize(); - activeQueue.add(record); - } - - if (swappedRecordCount > swapQueue.size()) { - // we have more swap files to be swapped in. - return; - } - - // If a call to #pollSwappableRecords will not produce any, go ahead and roll those FlowFiles back into the mix - if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) { - for (final FlowFileRecord record : swapQueue) { - activeQueue.add(record); - activeQueueContentSize += record.getSize(); - } - swapQueue.clear(); - swappedContentSize = 0L; - swappedRecordCount = 0; - swapMode = false; - } - } finally { - activeQueueSizeRef.set(activeQueue.size()); - } - } finally { - writeLock.unlock("putSwappedRecords"); - scheduler.registerEvent(connection.getDestination()); - } - } - - @Override - public void incrementSwapCount(final int numRecords, final long contentSize) { - writeLock.lock(); - try { - swappedContentSize += contentSize; - swappedRecordCount += numRecords; - } finally { - writeLock.unlock("incrementSwapCount"); - } - } - - @Override - public int unswappedSize() { - readLock.lock(); - try { - return activeQueue.size() + unacknowledgedSizeRef.get().getObjectCount(); - } finally { - readLock.unlock("unswappedSize"); - } - } - - @Override - public int getSwapRecordCount() { - readLock.lock(); - try { - return swappedRecordCount; - } finally { - readLock.unlock("getSwapRecordCount"); - } - } - - @Override - public int getSwapQueueSize() { - readLock.lock(); - try { - if (logger.isDebugEnabled()) { - final long byteToMbDivisor = 1024L * 1024L; - final QueueSize unacknowledged = unacknowledgedSizeRef.get(); - - logger.debug("Total Queue Size: ActiveQueue={}/{} MB, Swap Queue={}/{} MB, Unacknowledged={}/{} MB", - activeQueue.size(), activeQueueContentSize / byteToMbDivisor, - swappedRecordCount, swappedContentSize / byteToMbDivisor, - unacknowledged.getObjectCount(), unacknowledged.getByteCount() / byteToMbDivisor); - } - - return swapQueue.size(); - } finally { - readLock.unlock("getSwapQueueSize"); - } - } - - private boolean isLaterThan(final Long maxAge) { - if (maxAge == null) { - return false; - } - return maxAge < System.currentTimeMillis(); - } - - private Long getExpirationDate(final FlowFile flowFile, final long expirationMillis) { - if (flowFile == null) { - return null; - } - if (expirationMillis <= 0) { - return null; - } else { - final long entryDate = flowFile.getEntryDate(); - final long expirationDate = entryDate + expirationMillis; - return expirationDate; - } - } - - @Override - public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) { - FlowFileRecord flowFile = null; - - // First check if we have any records Pre-Fetched. - final long expirationMillis = flowFileExpirationMillis.get(); - final PreFetch preFetch = preFetchRef.get(); - if (preFetch != null) { - if (preFetch.isExpired()) { - requeueExpiredPrefetch(preFetch); - } else { - while (true) { - final FlowFileRecord next = preFetch.nextRecord(); - if (next == null) { - break; - } - - if (isLaterThan(getExpirationDate(next, expirationMillis))) { - expiredRecords.add(next); - continue; - } - - updateUnacknowledgedSize(1, next.getSize()); - return next; - } - - preFetchRef.compareAndSet(preFetch, null); - } - } - - writeLock.lock(); - try { - flowFile = doPoll(expiredRecords, expirationMillis); - return flowFile; - } finally { - activeQueueSizeRef.set(activeQueue.size()); - writeLock.unlock("poll(Set)"); - - if (flowFile != null) { - updateUnacknowledgedSize(1, flowFile.getSize()); - } - } - } - - private FlowFileRecord doPoll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis) { - FlowFileRecord flowFile; - boolean isExpired; - - migrateSwapToActive(); - boolean queueFullAtStart = queueFullRef.get(); - - do { - flowFile = this.activeQueue.poll(); - - isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis)); - if (isExpired) { - expiredRecords.add(flowFile); - if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) { - activeQueueContentSize -= flowFile.getSize(); - break; - } - } else if (flowFile != null && flowFile.isPenalized()) { - this.activeQueue.add(flowFile); - flowFile = null; - break; - } - - if (flowFile != null) { - activeQueueContentSize -= flowFile.getSize(); - } - } while (isExpired); - - // if at least 1 FlowFile was expired & the queue was full before we started, then - // we need to determine whether or not the queue is full again. If no FlowFile was expired, - // then the queue will still be full until the appropriate #acknowledge method is called. - if (queueFullAtStart && !expiredRecords.isEmpty()) { - queueFullRef.set(determineIfFull()); - } - - if (incrementPollCount()) { - prefetch(); - } - return isExpired ? null : flowFile; - } - - @Override - public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords) { - final List<FlowFileRecord> records = new ArrayList<>(Math.min(1024, maxResults)); - - // First check if we have any records Pre-Fetched. - final long expirationMillis = flowFileExpirationMillis.get(); - final PreFetch preFetch = preFetchRef.get(); - if (preFetch != null) { - if (preFetch.isExpired()) { - requeueExpiredPrefetch(preFetch); - } else { - long totalSize = 0L; - for (int i = 0; i < maxResults; i++) { - final FlowFileRecord next = preFetch.nextRecord(); - if (next == null) { - break; - } - - if (isLaterThan(getExpirationDate(next, expirationMillis))) { - expiredRecords.add(next); - continue; - } - - records.add(next); - totalSize += next.getSize(); - } - - // If anything was prefetched, use what we have. - if (!records.isEmpty()) { - updateUnacknowledgedSize(records.size(), totalSize); - return records; - } - - preFetchRef.compareAndSet(preFetch, null); - } - } - - writeLock.lock(); - try { - doPoll(records, maxResults, expiredRecords); - } finally { - activeQueueSizeRef.set(activeQueue.size()); - writeLock.unlock("poll(int, Set)"); - } - return records; - } - - private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords) { - migrateSwapToActive(); - - final boolean queueFullAtStart = queueFullRef.get(); - - final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords); - - long expiredBytes = 0L; - for (final FlowFileRecord record : expiredRecords) { - expiredBytes += record.getSize(); - } - - activeQueueContentSize -= bytesDrained; - updateUnacknowledgedSize(records.size(), bytesDrained - expiredBytes); - - // if at least 1 FlowFile was expired & the queue was full before we started, then - // we need to determine whether or not the queue is full again. If no FlowFile was expired, - // then the queue will still be full until the appropriate #acknowledge method is called. - if (queueFullAtStart && !expiredRecords.isEmpty()) { - queueFullRef.set(determineIfFull()); - } - - if (incrementPollCount()) { - prefetch(); - } - } - - /** - * If there are FlowFiles waiting on the swap queue, move them to the active - * queue until we meet our threshold. This prevents us from having to swap - * them to disk & then back out. - * - * This method MUST be called with the writeLock held. - */ - private void migrateSwapToActive() { - // Migrate as many FlowFiles as we can from the Swap Queue to the Active Queue, so that we don't - // have to swap them out & then swap them back in. - // If we don't do this, we could get into a situation where we have potentially thousands of FlowFiles - // sitting on the Swap Queue but not getting processed because there aren't enough to be swapped out. - // In particular, this can happen if the queue is typically filled with surges. - // For example, if the queue has 25,000 FlowFiles come in, it may process 20,000 of them and leave - // 5,000 sitting on the Swap Queue. If it then takes an hour for an additional 5,000 FlowFiles to come in, - // those FlowFiles sitting on the Swap Queue will sit there for an hour, waiting to be swapped out and - // swapped back in again. - // Calling this method when records are polled prevents this condition by migrating FlowFiles from the - // Swap Queue to the Active Queue. However, we don't do this if there are FlowFiles already swapped out - // to disk, because we want them to be swapped back in in the same order that they were swapped out. - - // this is the most common condition (nothing is swapped out), so do the check first and avoid the expense - // of other checks for 99.999% of the cases. - if (swappedRecordCount == 0 && swapQueue.isEmpty()) { - return; - } - - if (swappedRecordCount > swapQueue.size()) { - // we already have FlowFiles swapped out, so we won't migrate the queue; we will wait for - // an external process to swap FlowFiles back in. - return; - } - - final Iterator<FlowFileRecord> swapItr = swapQueue.iterator(); - while (activeQueue.size() < swapThreshold && swapItr.hasNext()) { - final FlowFileRecord toMigrate = swapItr.next(); - activeQueue.add(toMigrate); - activeQueueContentSize += toMigrate.getSize(); - swappedContentSize -= toMigrate.getSize(); - swappedRecordCount--; - - swapItr.remove(); - } - - if (swappedRecordCount == 0) { - swapMode = false; - } - } - - @Override - public long drainQueue(final Queue<FlowFileRecord> sourceQueue, final List<FlowFileRecord> destination, int maxResults, final Set<FlowFileRecord> expiredRecords) { - long drainedSize = 0L; - FlowFileRecord pulled = null; - - final long expirationMillis = this.flowFileExpirationMillis.get(); - while (destination.size() < maxResults && (pulled = sourceQueue.poll()) != null) { - if (isLaterThan(getExpirationDate(pulled, expirationMillis))) { - expiredRecords.add(pulled); - if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) { - break; - } - } else { - if (pulled.isPenalized()) { - sourceQueue.add(pulled); - break; - } - destination.add(pulled); - } - drainedSize += pulled.getSize(); - } - return drainedSize; - } - - @Override - public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) { - writeLock.lock(); - try { - migrateSwapToActive(); - if (activeQueue.isEmpty()) { - return Collections.emptyList(); - } - - final long expirationMillis = this.flowFileExpirationMillis.get(); - final boolean queueFullAtStart = queueFullRef.get(); - - final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>(); - final List<FlowFileRecord> unselected = new ArrayList<>(); - - while (true) { - FlowFileRecord flowFile = this.activeQueue.poll(); - if (flowFile == null) { - break; - } - - final boolean isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis)); - if (isExpired) { - expiredRecords.add(flowFile); - activeQueueContentSize -= flowFile.getSize(); - - if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) { - break; - } else { - continue; - } - } else if (flowFile.isPenalized()) { - this.activeQueue.add(flowFile); - flowFile = null; - break; // just stop searching because the rest are all penalized. - } - - final FlowFileFilterResult result = filter.filter(flowFile); - if (result.isAccept()) { - activeQueueContentSize -= flowFile.getSize(); - - updateUnacknowledgedSize(1, flowFile.getSize()); - selectedFlowFiles.add(flowFile); - } else { - unselected.add(flowFile); - } - - if (!result.isContinue()) { - break; - } - } - - this.activeQueue.addAll(unselected); - - // if at least 1 FlowFile was expired & the queue was full before we started, then - // we need to determine whether or not the queue is full again. If no FlowFile was expired, - // then the queue will still be full until the appropriate #acknowledge method is called. - if (queueFullAtStart && !expiredRecords.isEmpty()) { - queueFullRef.set(determineIfFull()); - } - - return selectedFlowFiles; - } finally { - activeQueueSizeRef.set(activeQueue.size()); - writeLock.unlock("poll(Filter, Set)"); - } - } - - private static final class Prioritizer implements Comparator<FlowFileRecord>, Serializable { - - private static final long serialVersionUID = 1L; - private final transient List<FlowFilePrioritizer> prioritizers = new ArrayList<>(); - - private Prioritizer(final List<FlowFilePrioritizer> priorities) { - if (null != priorities) { - prioritizers.addAll(priorities); - } - } - - @Override - public int compare(final FlowFileRecord f1, final FlowFileRecord f2) { - int returnVal = 0; - final boolean f1Penalized = f1.isPenalized(); - final boolean f2Penalized = f2.isPenalized(); - - if (f1Penalized && !f2Penalized) { - return 1; - } else if (!f1Penalized && f2Penalized) { - return -1; - } - - if (f1Penalized && f2Penalized) { - if (f1.getPenaltyExpirationMillis() < f2.getPenaltyExpirationMillis()) { - return -1; - } else if (f1.getPenaltyExpirationMillis() > f2.getPenaltyExpirationMillis()) { - return 1; - } - } - - if (!prioritizers.isEmpty()) { - for (final FlowFilePrioritizer prioritizer : prioritizers) { - returnVal = prioritizer.compare(f1, f2); - if (returnVal != 0) { - return returnVal; - } - } - } - - final ContentClaim claim1 = f1.getContentClaim(); - final ContentClaim claim2 = f2.getContentClaim(); - - // put the one without a claim first - if (claim1 == null && claim2 != null) { - return -1; - } else if (claim1 != null && claim2 == null) { - return 1; - } else if (claim1 != null && claim2 != null) { - final int claimComparison = claim1.compareTo(claim2); - if (claimComparison != 0) { - return claimComparison; - } - - final int claimOffsetComparison = Long.compare(f1.getContentClaimOffset(), f2.getContentClaimOffset()); - if (claimOffsetComparison != 0) { - return claimOffsetComparison; - } - } - - return Long.compare(f1.getId(), f2.getId()); - } - } - - @Override - public String getFlowFileExpiration() { - return flowFileExpirationPeriod.get(); - } - - @Override - public int getFlowFileExpiration(final TimeUnit timeUnit) { - return (int) timeUnit.convert(flowFileExpirationMillis.get(), TimeUnit.MILLISECONDS); - } - - @Override - public void setFlowFileExpiration(final String flowExpirationPeriod) { - final long millis = FormatUtils.getTimeDuration(flowExpirationPeriod, TimeUnit.MILLISECONDS); - if (millis < 0) { - throw new IllegalArgumentException("FlowFile Expiration Period must be positive"); - } - this.flowFileExpirationPeriod.set(flowExpirationPeriod); - this.flowFileExpirationMillis.set(millis); - } - - @Override - public String toString() { - return "FlowFileQueue[id=" + identifier + "]"; - } - - /** - * Lock the queue so that other threads are unable to interact with the - * queue - */ - public void lock() { - writeLock.lock(); - } - - /** - * Unlock the queue - */ - public void unlock() { - writeLock.unlock("external unlock"); - } - - private void updateUnacknowledgedSize(final int addToCount, final long addToSize) { - boolean updated = false; - - do { - QueueSize queueSize = unacknowledgedSizeRef.get(); - final QueueSize newSize = new QueueSize(queueSize.getObjectCount() + addToCount, queueSize.getByteCount() + addToSize); - updated = unacknowledgedSizeRef.compareAndSet(queueSize, newSize); - } while (!updated); - } - - private void requeueExpiredPrefetch(final PreFetch prefetch) { - if (prefetch == null) { - return; - } - - writeLock.lock(); - try { - final long contentSizeRequeued = prefetch.requeue(activeQueue); - this.activeQueueContentSize += contentSizeRequeued; - this.preFetchRef.compareAndSet(prefetch, null); - } finally { - writeLock.unlock("requeueExpiredPrefetch"); - } - } - - /** - * MUST be called with write lock held. - */ - private final AtomicReference<PreFetch> preFetchRef = new AtomicReference<>(); - - private void prefetch() { - if (activeQueue.isEmpty()) { - return; - } - - final int numToFetch = Math.min(prefetchSize, activeQueue.size()); - - final PreFetch curPreFetch = preFetchRef.get(); - if (curPreFetch != null && curPreFetch.size().getObjectCount() > 0) { - return; - } - - final List<FlowFileRecord> buffer = new ArrayList<>(numToFetch); - long contentSize = 0L; - for (int i = 0; i < numToFetch; i++) { - final FlowFileRecord record = activeQueue.poll(); - if (record == null || record.isPenalized()) { - // not enough unpenalized records to pull. Put all records back and return - activeQueue.addAll(buffer); - if ( record != null ) { - activeQueue.add(record); - } - return; - } else { - buffer.add(record); - contentSize += record.getSize(); - } - } - - activeQueueContentSize -= contentSize; - preFetchRef.set(new PreFetch(buffer)); - } - - private final TimedBuffer<TimestampedLong> pollCounts = new TimedBuffer<>(TimeUnit.SECONDS, 5, new LongEntityAccess()); - - private boolean incrementPollCount() { - pollCounts.add(new TimestampedLong(1L)); - final long totalCount = pollCounts.getAggregateValue(System.currentTimeMillis() - 5000L).getValue(); - return totalCount > PREFETCH_POLL_THRESHOLD * 5; - } - - private static class PreFetch { - - private final List<FlowFileRecord> records; - private final AtomicInteger pointer = new AtomicInteger(0); - private final long expirationTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(1L); - private final AtomicLong contentSize = new AtomicLong(0L); - - public PreFetch(final List<FlowFileRecord> records) { - this.records = records; - - long totalSize = 0L; - for (final FlowFileRecord record : records) { - totalSize += record.getSize(); - } - contentSize.set(totalSize); - } - - public FlowFileRecord nextRecord() { - final int nextValue = pointer.getAndIncrement(); - if (nextValue >= records.size()) { - return null; - } - - final FlowFileRecord flowFile = records.get(nextValue); - contentSize.addAndGet(-flowFile.getSize()); - return flowFile; - } - - public QueueSize size() { - final int pointerIndex = pointer.get(); - final int count = records.size() - pointerIndex; - if (count < 0) { - return new QueueSize(0, 0L); - } - - final long bytes = contentSize.get(); - return new QueueSize(count, bytes); - } - - public boolean isExpired() { - return System.nanoTime() > expirationTime; - } - - private long requeue(final Queue<FlowFileRecord> queue) { - // get the current pointer and prevent any other thread from accessing the rest of the elements - final int curPointer = pointer.getAndAdd(records.size()); - if (curPointer < records.size() - 1) { - final List<FlowFileRecord> subList = records.subList(curPointer, records.size()); - long contentSize = 0L; - for (final FlowFileRecord record : subList) { - contentSize += record.getSize(); - } - - queue.addAll(subList); - - return contentSize; - } - return 0L; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java deleted file mode 100644 index e34e043..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java +++ /dev/null @@ -1,541 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import static java.util.Objects.requireNonNull; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.connectable.Connectable; -import org.apache.nifi.connectable.ConnectableType; -import org.apache.nifi.connectable.Connection; -import org.apache.nifi.connectable.Funnel; -import org.apache.nifi.connectable.Position; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessSessionFactory; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.scheduling.SchedulingStrategy; -import org.apache.nifi.util.FormatUtils; - -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; - -public class StandardFunnel implements Funnel { - - public static final long MINIMUM_PENALIZATION_MILLIS = 0L; - public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; - public static final long MINIMUM_YIELD_MILLIS = 0L; - public static final long DEFAULT_YIELD_PERIOD = 1000L; - public static final TimeUnit DEFAULT_YIELD_TIME_UNIT = TimeUnit.MILLISECONDS; - - private final String identifier; - private final Set<Connection> outgoingConnections; - private final List<Connection> incomingConnections; - private final List<Relationship> relationships; - - private final AtomicReference<ProcessGroup> processGroupRef; - private final AtomicReference<Position> position; - private final AtomicReference<String> penalizationPeriod; - private final AtomicReference<String> yieldPeriod; - private final AtomicReference<String> schedulingPeriod; - private final AtomicReference<String> name; - private final AtomicLong schedulingNanos; - private final AtomicBoolean lossTolerant; - private final AtomicReference<ScheduledState> scheduledState; - private final AtomicLong yieldExpiration; - - private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); - private final Lock readLock = rwLock.readLock(); - private final Lock writeLock = rwLock.writeLock(); - - public StandardFunnel(final String identifier, final ProcessGroup processGroup, final ProcessScheduler scheduler) { - this.identifier = identifier; - this.processGroupRef = new AtomicReference<>(processGroup); - - outgoingConnections = new HashSet<>(); - incomingConnections = new ArrayList<>(); - - final List<Relationship> relationships = new ArrayList<>(); - relationships.add(Relationship.ANONYMOUS); - this.relationships = Collections.unmodifiableList(relationships); - - lossTolerant = new AtomicBoolean(false); - position = new AtomicReference<>(new Position(0D, 0D)); - scheduledState = new AtomicReference<>(ScheduledState.STOPPED); - penalizationPeriod = new AtomicReference<>("30 sec"); - yieldPeriod = new AtomicReference<>("1 sec"); - yieldExpiration = new AtomicLong(0L); - schedulingPeriod = new AtomicReference<>("0 millis"); - schedulingNanos = new AtomicLong(30000); - name = new AtomicReference<>("Funnel"); - } - - @Override - public String getIdentifier() { - return identifier; - } - - @Override - public Collection<Relationship> getRelationships() { - return relationships; - } - - @Override - public Relationship getRelationship(final String relationshipName) { - return (Relationship.ANONYMOUS.getName().equals(relationshipName)) ? Relationship.ANONYMOUS : null; - } - - @Override - public void addConnection(final Connection connection) throws IllegalArgumentException { - writeLock.lock(); - try { - if (!requireNonNull(connection).getSource().equals(this) && !connection.getDestination().equals(this)) { - throw new IllegalArgumentException("Cannot add a connection to a Funnel for which the Funnel is neither the Source nor the Destination"); - } - if (connection.getSource().equals(this) && connection.getDestination().equals(this)) { - throw new IllegalArgumentException("Cannot add a connection from a Funnel back to itself"); - } - - if (connection.getDestination().equals(this)) { - // don't add the connection twice. This may occur if we have a self-loop because we will be told - // to add the connection once because we are the source and again because we are the destination. - if (!incomingConnections.contains(connection)) { - incomingConnections.add(connection); - } - } - - if (connection.getSource().equals(this)) { - // don't add the connection twice. This may occur if we have a self-loop because we will be told - // to add the connection once because we are the source and again because we are the destination. - if (!outgoingConnections.contains(connection)) { - for (final Relationship relationship : connection.getRelationships()) { - if (!relationship.equals(Relationship.ANONYMOUS)) { - throw new IllegalArgumentException("No relationship with name " + relationship + " exists for Funnels"); - } - } - - outgoingConnections.add(connection); - } - } - } finally { - writeLock.unlock(); - } - } - - @Override - public boolean hasIncomingConnection() { - readLock.lock(); - try { - return !incomingConnections.isEmpty(); - } finally { - readLock.unlock(); - } - } - - @Override - public void updateConnection(final Connection connection) throws IllegalStateException { - if (requireNonNull(connection).getSource().equals(this)) { - writeLock.lock(); - try { - if (!outgoingConnections.remove(connection)) { - throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port"); - } - outgoingConnections.add(connection); - } finally { - writeLock.unlock(); - } - } - - if (connection.getDestination().equals(this)) { - writeLock.lock(); - try { - if (!incomingConnections.remove(connection)) { - throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port"); - } - incomingConnections.add(connection); - } finally { - writeLock.unlock(); - } - } - } - - @Override - public void removeConnection(final Connection connection) throws IllegalArgumentException, IllegalStateException { - writeLock.lock(); - try { - if (!requireNonNull(connection).getSource().equals(this)) { - final boolean existed = incomingConnections.remove(connection); - if (!existed) { - throw new IllegalStateException("The given connection is not currently registered for this ProcessorNode"); - } - return; - } - - final boolean removed = outgoingConnections.remove(connection); - if (!removed) { - throw new IllegalStateException(connection + " is not registered with " + this); - } - } finally { - writeLock.unlock(); - } - } - - @Override - public Set<Connection> getConnections() { - readLock.lock(); - try { - return Collections.unmodifiableSet(outgoingConnections); - } finally { - readLock.unlock(); - } - } - - @Override - public Set<Connection> getConnections(final Relationship relationship) { - readLock.lock(); - try { - if (relationship.equals(Relationship.ANONYMOUS)) { - return Collections.unmodifiableSet(outgoingConnections); - } - - throw new IllegalArgumentException("No relationship with name " + relationship.getName() + " exists for Funnels"); - } finally { - readLock.unlock(); - } - } - - @Override - public List<Connection> getIncomingConnections() { - readLock.lock(); - try { - return new ArrayList<>(incomingConnections); - } finally { - readLock.unlock(); - } - } - - @Override - public Position getPosition() { - return position.get(); - } - - @Override - public void setPosition(Position position) { - this.position.set(position); - } - - @Override - public String getName() { - return name.get(); - } - - /** - * Throws {@link UnsupportedOperationException} - * - * @param name - */ - @Override - public void setName(final String name) { - throw new UnsupportedOperationException(); - } - - @Override - public String getComments() { - return ""; - } - - @Override - public void setComments(final String comments) { - throw new UnsupportedOperationException(); - } - - @Override - public ProcessGroup getProcessGroup() { - return processGroupRef.get(); - } - - @Override - public void setProcessGroup(final ProcessGroup group) { - processGroupRef.set(group); - } - - @Override - public boolean isAutoTerminated(Relationship relationship) { - return false; - } - - @Override - public boolean isRunning() { - return isRunning(this); - } - - private boolean isRunning(final Connectable source) { - return getScheduledState() == ScheduledState.RUNNING; - } - - @Override - public boolean isTriggerWhenEmpty() { - return false; - } - - @Override - public ScheduledState getScheduledState() { - return scheduledState.get(); - } - - @Override - public boolean isLossTolerant() { - return lossTolerant.get(); - } - - @Override - public void setLossTolerant(final boolean lossTolerant) { - this.lossTolerant.set(lossTolerant); - } - - @Override - public String toString() { - return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", getIdentifier()).toString(); - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { - final ProcessSession session = sessionFactory.createSession(); - - try { - onTrigger(context, session); - session.commit(); - } catch (final ProcessException e) { - session.rollback(); - throw e; - } catch (final Throwable t) { - session.rollback(); - throw new RuntimeException(t); - } - } - - private void onTrigger(final ProcessContext context, final ProcessSession session) { - readLock.lock(); - try { - Set<Relationship> available = context.getAvailableRelationships(); - int transferred = 0; - while (!available.isEmpty()) { - final List<FlowFile> flowFiles = session.get(10); - if (flowFiles.isEmpty()) { - break; - } - - transferred += flowFiles.size(); - session.transfer(flowFiles, Relationship.ANONYMOUS); - session.commit(); - available = context.getAvailableRelationships(); - } - - if (transferred == 0) { - context.yield(); - } - } finally { - readLock.unlock(); - } - } - - /** - * Has no effect - */ - @Override - public void setMaxConcurrentTasks(int taskCount) { - } - - @Override - public int getMaxConcurrentTasks() { - return 1; - } - - @Override - public void setScheduledState(final ScheduledState scheduledState) { - this.scheduledState.set(scheduledState); - } - - @Override - public ConnectableType getConnectableType() { - return ConnectableType.FUNNEL; - } - - @Override - @SuppressWarnings("unchecked") - public Collection<ValidationResult> getValidationErrors() { - return Collections.EMPTY_LIST; - } - - /** - * Updates the amount of time that this processor should avoid being - * scheduled when the processor calls - * {@link nifi.processor.ProcessContext#yield() ProcessContext.yield()} - * - * @param yieldPeriod - */ - @Override - public void setYieldPeriod(final String yieldPeriod) { - final long yieldMillis = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS); - if (yieldMillis < 0) { - throw new IllegalArgumentException("Yield duration must be positive"); - } - this.yieldPeriod.set(yieldPeriod); - } - - /** - * @param schedulingPeriod - */ - @Override - public void setScheduldingPeriod(final String schedulingPeriod) { - final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), TimeUnit.NANOSECONDS); - if (schedulingNanos < 0) { - throw new IllegalArgumentException("Scheduling Period must be positive"); - } - - this.schedulingPeriod.set(schedulingPeriod); - this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos)); - } - - @Override - public long getPenalizationPeriod(final TimeUnit timeUnit) { - return FormatUtils.getTimeDuration(getPenalizationPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); - } - - @Override - public String getPenalizationPeriod() { - return penalizationPeriod.get(); - } - - /** - * Causes the processor not to be scheduled for some period of time. This - * duration can be obtained and set via the - * {@link #getYieldPeriod(TimeUnit)} and - * {@link #setYieldPeriod(long, TimeUnit)} methods. - */ - @Override - public void yield() { - final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS); - yieldExpiration.set(Math.max(yieldExpiration.get(), System.currentTimeMillis() + yieldMillis)); - } - - @Override - public long getYieldExpiration() { - return yieldExpiration.get(); - } - - @Override - public String getSchedulingPeriod() { - return schedulingPeriod.get(); - } - - @Override - public void setPenalizationPeriod(final String penalizationPeriod) { - this.penalizationPeriod.set(penalizationPeriod); - } - - @Override - public String getYieldPeriod() { - return yieldPeriod.get(); - } - - @Override - public long getYieldPeriod(final TimeUnit timeUnit) { - return FormatUtils.getTimeDuration(getYieldPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); - } - - @Override - public long getSchedulingPeriod(final TimeUnit timeUnit) { - return timeUnit.convert(schedulingNanos.get(), TimeUnit.NANOSECONDS); - } - - @Override - public boolean isSideEffectFree() { - return true; - } - - @Override - public void verifyCanDelete(boolean ignoreConnections) throws IllegalStateException { - if (ignoreConnections) { - return; - } - - readLock.lock(); - try { - for (final Connection connection : outgoingConnections) { - connection.verifyCanDelete(); - } - - for (final Connection connection : incomingConnections) { - if (connection.getSource().equals(this)) { - connection.verifyCanDelete(); - } else { - throw new IllegalStateException(this + " is the destination of another component"); - } - } - } finally { - readLock.unlock(); - } - } - - @Override - public void verifyCanDelete() { - verifyCanDelete(false); - } - - @Override - public void verifyCanStart() { - } - - @Override - public void verifyCanStop() { - } - - @Override - public void verifyCanUpdate() { - } - - @Override - public void verifyCanEnable() { - } - - @Override - public void verifyCanDisable() { - } - - @Override - public SchedulingStrategy getSchedulingStrategy() { - return SchedulingStrategy.TIMER_DRIVEN; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java deleted file mode 100644 index df3c251..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import java.util.Map; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; - -public interface ValidationContextFactory { - - ValidationContext newValidationContext(Map<PropertyDescriptor, String> properties, String annotationData); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/WorkerQueue.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/WorkerQueue.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/WorkerQueue.java deleted file mode 100644 index 2f43600..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/WorkerQueue.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.connectable.Connectable; - -public interface WorkerQueue { - - EventBasedWorker poll(long timeout, TimeUnit timeUnit); - - void offer(Connectable worker); - - void setClustered(boolean clustered); - - void setPrimary(boolean primary); - - void suspendWork(Connectable worker); - - void resumeWork(Connectable worker); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/CommunicationsException.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/CommunicationsException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/CommunicationsException.java deleted file mode 100644 index 368ed1b..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/CommunicationsException.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.exception; - -import java.io.IOException; - -public class CommunicationsException extends IOException { - - private static final long serialVersionUID = 142343242323423L; - - public CommunicationsException() { - super(); - } - - public CommunicationsException(final Throwable cause) { - super(cause); - } - - public CommunicationsException(final String explanation) { - super(explanation); - } - - public CommunicationsException(final String explanation, final Throwable cause) { - super(explanation, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceAlreadyExistsException.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceAlreadyExistsException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceAlreadyExistsException.java deleted file mode 100644 index 0ff68b0..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceAlreadyExistsException.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.exception; - -public class ControllerServiceAlreadyExistsException extends RuntimeException { - - private static final long serialVersionUID = -544424320587059277L; - - /** - * Constructs a default exception - * @param id - */ - public ControllerServiceAlreadyExistsException(final String id) { - super("A Controller Service already exists with ID " + id); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java deleted file mode 100644 index 4cdbe54..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.exception; - -public class ControllerServiceNotFoundException extends RuntimeException { - - private static final long serialVersionUID = -544424320587059277L; - - /** - * Constructs a default exception - */ - public ControllerServiceNotFoundException() { - super(); - } - - /** - * @param message - */ - public ControllerServiceNotFoundException(String message) { - super(message); - } - - /** - * @param cause - */ - public ControllerServiceNotFoundException(Throwable cause) { - super(cause); - } - - /** - * @param message - * @param cause - */ - public ControllerServiceNotFoundException(String message, Throwable cause) { - super(message, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java deleted file mode 100644 index c4aba44..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.exception; - -public class ProcessorInstantiationException extends Exception { - - private static final long serialVersionUID = 189273489L; - - public ProcessorInstantiationException(final String className, final Throwable t) { - super(className, t); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorLifeCycleException.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorLifeCycleException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorLifeCycleException.java deleted file mode 100644 index 5acca16..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorLifeCycleException.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.exception; - -public class ProcessorLifeCycleException extends RuntimeException { - - private static final long serialVersionUID = 8392341500511490941L; - - public ProcessorLifeCycleException(final String message, final Throwable t) { - super(message, t); - } - - public ProcessorLifeCycleException(final Throwable t) { - super(t); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/label/Label.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/label/Label.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/label/Label.java deleted file mode 100644 index 97c44b5..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/label/Label.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.label; - -import java.util.Map; - -import org.apache.nifi.connectable.Position; -import org.apache.nifi.connectable.Size; -import org.apache.nifi.groups.ProcessGroup; - -public interface Label { - - String getIdentifier(); - - Position getPosition(); - - void setPosition(Position position); - - Map<String, String> getStyle(); - - void setStyle(Map<String, String> style); - - Size getSize(); - - void setSize(Size size); - - ProcessGroup getProcessGroup(); - - void setProcessGroup(ProcessGroup group); - - String getValue(); - - void setValue(String value); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskInstantiationException.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskInstantiationException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskInstantiationException.java deleted file mode 100644 index ced6ff9..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskInstantiationException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.reporting; - -public class ReportingTaskInstantiationException extends Exception { - - private static final long serialVersionUID = 189234789237L; - - public ReportingTaskInstantiationException(final String className, final Throwable t) { - super(className, t); - } - - public ReportingTaskInstantiationException(final String message) { - super(message); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java deleted file mode 100644 index 6ce7ba6..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.repository; - -import org.apache.nifi.controller.repository.claim.ContentClaim; - -/** - * - * @author none - */ -public class ContentNotFoundException extends RuntimeException { - - private static final long serialVersionUID = 19048239082L; - private final transient ContentClaim claim; - - public ContentNotFoundException(final ContentClaim claim) { - super("Could not find content for " + claim); - this.claim = claim; - } - - public ContentNotFoundException(final ContentClaim claim, final Throwable t) { - super("Could not find content for " + claim, t); - this.claim = claim; - } - - public ContentNotFoundException(final ContentClaim claim, final String message) { - super("Could not find content for " + claim + ": " + message); - this.claim = claim; - } - - public ContentClaim getMissingClaim() { - return claim; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/CounterRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/CounterRepository.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/CounterRepository.java deleted file mode 100644 index de231ed..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/CounterRepository.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.repository; - -import java.util.List; - -import org.apache.nifi.controller.Counter; - -public interface CounterRepository { - - void adjustCounter(String counterContext, String name, long delta); - - Counter getCounter(String counterContext, String name); - - List<Counter> getCounters(); - - List<Counter> getCounters(String counterContext); - - Counter resetCounter(String identifier); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java deleted file mode 100644 index f07a530..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.repository; - -public interface FlowFileEvent { - - String getComponentIdentifier(); - - int getFlowFilesIn(); - - int getFlowFilesOut(); - - int getFlowFilesRemoved(); - - long getContentSizeIn(); - - long getContentSizeOut(); - - long getContentSizeRemoved(); - - long getBytesRead(); - - long getBytesWritten(); - - long getProcessingNanoseconds(); - - long getAverageLineageMillis(); - - long getAggregateLineageMillis(); - - int getFlowFilesReceived(); - - long getBytesReceived(); - - int getFlowFilesSent(); - - long getBytesSent(); - - int getInvocations(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java deleted file mode 100644 index 2eb3caf..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.repository; - -import java.io.Closeable; -import java.io.IOException; - -/** - * - * @author none - */ -public interface FlowFileEventRepository extends Closeable { - - /** - * Updates the repository to include a new FlowFile processing event - * - * @param event - * @throws java.io.IOException - */ - void updateRepository(FlowFileEvent event) throws IOException; - - /** - * Returns a report of processing activity since the given time - * @param sinceEpochMillis - * @return - */ - RepositoryStatusReport reportTransferEvents(long sinceEpochMillis); - - /** - * Causes any flow file events of the given entry age in epoch milliseconds - * or older to be purged from the repository - * - * @param cutoffEpochMilliseconds - */ - void purgeTransferEvents(long cutoffEpochMilliseconds); -}
