http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java new file mode 100644 index 0000000..5bf75a4 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +import org.apache.nifi.controller.ProcessScheduler; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileRepository; +import org.apache.nifi.controller.repository.RepositoryRecord; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.provenance.ProvenanceEventBuilder; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.util.FormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public abstract class AbstractFlowFileQueue implements FlowFileQueue { + private static final Logger logger = LoggerFactory.getLogger(AbstractFlowFileQueue.class); + private final String identifier; + private final FlowFileRepository flowFileRepository; + private final ProvenanceEventRepository provRepository; + private final ResourceClaimManager resourceClaimManager; + private final ProcessScheduler scheduler; + + private final AtomicReference<TimePeriod> expirationPeriod = new AtomicReference<>(new TimePeriod("0 mins", 0L)); + private final AtomicReference<MaxQueueSize> maxQueueSize = new AtomicReference<>(new MaxQueueSize("1 GB", 1024 * 1024 * 1024, 10000)); + + private final ConcurrentMap<String, ListFlowFileRequest> listRequestMap = new ConcurrentHashMap<>(); + private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>(); + + private LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.DO_NOT_LOAD_BALANCE; + private String partitioningAttribute = null; + + private LoadBalanceCompression compression = LoadBalanceCompression.DO_NOT_COMPRESS; + + + public AbstractFlowFileQueue(final String identifier, final ProcessScheduler scheduler, + final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo, final ResourceClaimManager resourceClaimManager) { + this.identifier = identifier; + this.scheduler = scheduler; + this.flowFileRepository = flowFileRepo; + this.provRepository = provRepo; + this.resourceClaimManager = resourceClaimManager; + } + + @Override + public String getIdentifier() { + return identifier; + } + + protected ProcessScheduler getScheduler() { + return scheduler; + } + + @Override + public String getFlowFileExpiration() { + return expirationPeriod.get().getPeriod(); + } + + @Override + public int getFlowFileExpiration(final TimeUnit timeUnit) { + return (int) timeUnit.convert(expirationPeriod.get().getMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public void setFlowFileExpiration(final String flowExpirationPeriod) { + final long millis = FormatUtils.getTimeDuration(flowExpirationPeriod, TimeUnit.MILLISECONDS); + if (millis < 0) { + throw new IllegalArgumentException("FlowFile Expiration Period must be positive"); + } + + expirationPeriod.set(new TimePeriod(flowExpirationPeriod, millis)); + } + + @Override + public void setBackPressureObjectThreshold(final long threshold) { + boolean updated = false; + while (!updated) { + MaxQueueSize maxSize = getMaxQueueSize(); + final MaxQueueSize updatedSize = new MaxQueueSize(maxSize.getMaxSize(), maxSize.getMaxBytes(), threshold); + updated = maxQueueSize.compareAndSet(maxSize, updatedSize); + } + } + + @Override + public long getBackPressureObjectThreshold() { + return getMaxQueueSize().getMaxCount(); + } + + @Override + public void setBackPressureDataSizeThreshold(final String maxDataSize) { + final long maxBytes = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue(); + + boolean updated = false; + while (!updated) { + MaxQueueSize maxSize = getMaxQueueSize(); + final MaxQueueSize updatedSize = new MaxQueueSize(maxDataSize, maxBytes, maxSize.getMaxCount()); + updated = maxQueueSize.compareAndSet(maxSize, updatedSize); + } + } + + @Override + public String getBackPressureDataSizeThreshold() { + return getMaxQueueSize().getMaxSize(); + } + + private MaxQueueSize getMaxQueueSize() { + return maxQueueSize.get(); + } + + @Override + public boolean isFull() { + final MaxQueueSize maxSize = getMaxQueueSize(); + + // Check if max size is set + if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) { + return false; + } + + final QueueSize queueSize = size(); + if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= maxSize.getMaxCount()) { + return true; + } + + if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >= maxSize.getMaxBytes()) { + return true; + } + + return false; + } + + + @Override + public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults) { + // purge any old requests from the map just to keep it clean. But if there are very few requests, which is usually the case, then don't bother + if (listRequestMap.size() > 10) { + final List<String> toDrop = new ArrayList<>(); + for (final Map.Entry<String, ListFlowFileRequest> entry : listRequestMap.entrySet()) { + final ListFlowFileRequest request = entry.getValue(); + final boolean completed = request.getState() == ListFlowFileState.COMPLETE || request.getState() == ListFlowFileState.FAILURE; + + if (completed && System.currentTimeMillis() - request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) { + toDrop.add(entry.getKey()); + } + } + + for (final String requestId : toDrop) { + listRequestMap.remove(requestId); + } + } + + // numSteps = 1 for each swap location + 1 for active queue + 1 for swap queue. + final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, maxResults, size()); + + final Thread t = new Thread(new Runnable() { + @Override + public void run() { + int position = 0; + int resultCount = 0; + final List<FlowFileSummary> summaries = new ArrayList<>(); + + // Create an ArrayList that contains all of the contents of the active queue. + // We do this so that we don't have to hold the lock any longer than absolutely necessary. + // We cannot simply pull the first 'maxResults' records from the queue, however, because the + // Iterator provided by PriorityQueue does not return records in order. So we would have to either + // use a writeLock and 'pop' the first 'maxResults' records off the queue or use a read lock and + // do a shallow copy of the queue. The shallow copy is generally quicker because it doesn't have to do + // the sorting to put the records back. So even though this has an expensive of Java Heap to create the + // extra collection, we are making this trade-off to avoid locking the queue any longer than required. + final List<FlowFileRecord> allFlowFiles = getListableFlowFiles(); + final QueuePrioritizer prioritizer = new QueuePrioritizer(getPriorities()); + + listRequest.setState(ListFlowFileState.CALCULATING_LIST); + + // sort the FlowFileRecords so that we have the list in the same order as on the queue. + allFlowFiles.sort(prioritizer); + + for (final FlowFileRecord flowFile : allFlowFiles) { + summaries.add(summarize(flowFile, ++position)); + if (summaries.size() >= maxResults) { + break; + } + } + + logger.debug("{} Finished listing FlowFiles for active queue with a total of {} results", this, resultCount); + listRequest.setFlowFileSummaries(summaries); + listRequest.setState(ListFlowFileState.COMPLETE); + } + }, "List FlowFiles for Connection " + getIdentifier()); + t.setDaemon(true); + t.start(); + + listRequestMap.put(requestIdentifier, listRequest); + return listRequest; + } + + @Override + public ListFlowFileStatus getListFlowFileStatus(final String requestIdentifier) { + return listRequestMap.get(requestIdentifier); + } + + @Override + public ListFlowFileStatus cancelListFlowFileRequest(final String requestIdentifier) { + logger.info("Canceling ListFlowFile Request with ID {}", requestIdentifier); + final ListFlowFileRequest request = listRequestMap.remove(requestIdentifier); + if (request != null) { + request.cancel(); + } + + return request; + } + + /** + * @return all FlowFiles that should be listed in response to a List Queue request + */ + protected abstract List<FlowFileRecord> getListableFlowFiles(); + + + @Override + public DropFlowFileStatus dropFlowFiles(final String requestIdentifier, final String requestor) { + logger.info("Initiating drop of FlowFiles from {} on behalf of {} (request identifier={})", this, requestor, requestIdentifier); + + // purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother + if (dropRequestMap.size() > 10) { + final List<String> toDrop = new ArrayList<>(); + for (final Map.Entry<String, DropFlowFileRequest> entry : dropRequestMap.entrySet()) { + final DropFlowFileRequest request = entry.getValue(); + final boolean completed = request.getState() == DropFlowFileState.COMPLETE || request.getState() == DropFlowFileState.FAILURE; + + if (completed && System.currentTimeMillis() - request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) { + toDrop.add(entry.getKey()); + } + } + + for (final String requestId : toDrop) { + dropRequestMap.remove(requestId); + } + } + + final DropFlowFileRequest dropRequest = new DropFlowFileRequest(requestIdentifier); + final QueueSize originalSize = size(); + dropRequest.setCurrentSize(originalSize); + dropRequest.setOriginalSize(originalSize); + if (originalSize.getObjectCount() == 0) { + dropRequest.setDroppedSize(originalSize); + dropRequest.setState(DropFlowFileState.COMPLETE); + dropRequestMap.put(requestIdentifier, dropRequest); + return dropRequest; + } + + final Thread t = new Thread(new Runnable() { + @Override + public void run() { + dropFlowFiles(dropRequest, requestor); + } + }, "Drop FlowFiles for Connection " + getIdentifier()); + t.setDaemon(true); + t.start(); + + dropRequestMap.put(requestIdentifier, dropRequest); + + return dropRequest; + } + + + @Override + public DropFlowFileRequest cancelDropFlowFileRequest(final String requestIdentifier) { + final DropFlowFileRequest request = dropRequestMap.remove(requestIdentifier); + if (request == null) { + return null; + } + + request.cancel(); + return request; + } + + @Override + public DropFlowFileStatus getDropFlowFileStatus(final String requestIdentifier) { + return dropRequestMap.get(requestIdentifier); + } + + /** + * Synchronously drops all FlowFiles in the queue + * + * @param dropRequest the request + * @param requestor the identity of the user/agent who made the request + */ + protected abstract void dropFlowFiles(final DropFlowFileRequest dropRequest, final String requestor); + + @Override + public void verifyCanList() throws IllegalStateException { + } + + + protected FlowFileSummary summarize(final FlowFile flowFile, final int position) { + // extract all of the information that we care about into new variables rather than just + // wrapping the FlowFile object with a FlowFileSummary object. We do this because we want to + // be able to hold many FlowFileSummary objects in memory and if we just wrap the FlowFile object, + // we will end up holding the entire FlowFile (including all Attributes) in the Java heap as well, + // which can be problematic if we expect them to be swapped out. + final String uuid = flowFile.getAttribute(CoreAttributes.UUID.key()); + final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final long size = flowFile.getSize(); + final Long lastQueuedTime = flowFile.getLastQueueDate(); + final long lineageStart = flowFile.getLineageStartDate(); + final boolean penalized = flowFile.isPenalized(); + + return new FlowFileSummary() { + @Override + public String getUuid() { + return uuid; + } + + @Override + public String getFilename() { + return filename; + } + + @Override + public int getPosition() { + return position; + } + + @Override + public long getSize() { + return size; + } + + @Override + public long getLastQueuedTime() { + return lastQueuedTime == null ? 0L : lastQueuedTime; + } + + @Override + public long getLineageStartDate() { + return lineageStart; + } + + @Override + public boolean isPenalized() { + return penalized; + } + }; + } + + protected QueueSize drop(final List<FlowFileRecord> flowFiles, final String requestor) throws IOException { + // Create a Provenance Event and a FlowFile Repository record for each FlowFile + final List<ProvenanceEventRecord> provenanceEvents = new ArrayList<>(flowFiles.size()); + final List<RepositoryRecord> flowFileRepoRecords = new ArrayList<>(flowFiles.size()); + for (final FlowFileRecord flowFile : flowFiles) { + provenanceEvents.add(createDropProvenanceEvent(flowFile, requestor)); + flowFileRepoRecords.add(createDeleteRepositoryRecord(flowFile)); + } + + long dropContentSize = 0L; + for (final FlowFileRecord flowFile : flowFiles) { + dropContentSize += flowFile.getSize(); + final ContentClaim contentClaim = flowFile.getContentClaim(); + if (contentClaim == null) { + continue; + } + + final ResourceClaim resourceClaim = contentClaim.getResourceClaim(); + if (resourceClaim == null) { + continue; + } + + resourceClaimManager.decrementClaimantCount(resourceClaim); + } + + provRepository.registerEvents(provenanceEvents); + flowFileRepository.updateRepository(flowFileRepoRecords); + return new QueueSize(flowFiles.size(), dropContentSize); + } + + private ProvenanceEventRecord createDropProvenanceEvent(final FlowFileRecord flowFile, final String requestor) { + final ProvenanceEventBuilder builder = provRepository.eventBuilder(); + builder.fromFlowFile(flowFile); + builder.setEventType(ProvenanceEventType.DROP); + builder.setLineageStartDate(flowFile.getLineageStartDate()); + builder.setComponentId(getIdentifier()); + builder.setComponentType("Connection"); + builder.setAttributes(flowFile.getAttributes(), Collections.emptyMap()); + builder.setDetails("FlowFile Queue emptied by " + requestor); + builder.setSourceQueueIdentifier(getIdentifier()); + + final ContentClaim contentClaim = flowFile.getContentClaim(); + if (contentClaim != null) { + final ResourceClaim resourceClaim = contentClaim.getResourceClaim(); + builder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), contentClaim.getOffset(), flowFile.getSize()); + } + + return builder.build(); + } + + private RepositoryRecord createDeleteRepositoryRecord(final FlowFileRecord flowFile) { + return new DropFlowFileRepositoryRecord(this, flowFile); + } + + @Override + public synchronized void setLoadBalanceStrategy(final LoadBalanceStrategy strategy, final String partitioningAttribute) { + if (strategy == LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE && !FlowFile.KeyValidator.isValid(partitioningAttribute)) { + throw new IllegalArgumentException("Cannot set Load Balance Strategy to " + strategy + " without providing a valid Partitioning Attribute"); + } + + this.loadBalanceStrategy = strategy; + this.partitioningAttribute = partitioningAttribute; + } + + @Override + public synchronized String getPartitioningAttribute() { + return partitioningAttribute; + } + + @Override + public synchronized LoadBalanceStrategy getLoadBalanceStrategy() { + return loadBalanceStrategy; + } + + @Override + public synchronized void setLoadBalanceCompression(final LoadBalanceCompression compression) { + this.compression = compression; + } + + @Override + public synchronized LoadBalanceCompression getLoadBalanceCompression() { + return compression; + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java new file mode 100644 index 0000000..9a220ae --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileSwapManager; +import org.apache.nifi.events.EventReporter; + +import java.util.Collection; +import java.util.Set; + +public class BlockingSwappablePriorityQueue extends SwappablePriorityQueue { + private final Object monitor = new Object(); + + public BlockingSwappablePriorityQueue(final FlowFileSwapManager swapManager, final int swapThreshold, final EventReporter eventReporter, final FlowFileQueue flowFileQueue, + final DropFlowFileAction dropAction, final String partitionName) { + + super(swapManager, swapThreshold, eventReporter, flowFileQueue, dropAction, partitionName); + } + + @Override + public void put(final FlowFileRecord flowFile) { + super.put(flowFile); + + synchronized (monitor) { + monitor.notify(); + } + } + + @Override + public void putAll(final Collection<FlowFileRecord> flowFiles) { + super.putAll(flowFiles); + + synchronized (monitor) { + monitor.notifyAll(); + } + } + + public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final long waitMillis) throws InterruptedException { + final long maxTimestamp = System.currentTimeMillis() + waitMillis; + + synchronized (monitor) { + FlowFileRecord flowFile = null; + do { + flowFile = super.poll(expiredRecords, expirationMillis); + if (flowFile != null) { + return flowFile; + } + + monitor.wait(waitMillis); + } while (System.currentTimeMillis() < maxTimestamp); + + return null; + } + } + + @Override + public void inheritQueueContents(final FlowFileQueueContents queueContents) { + // We have to override this method and synchronize on monitor before calling super.inheritQueueContents. + // If we don't do this, then our super class will obtain the write lock and call putAll, which will cause + // us to synchronize on monitor AFTER obtaining the write lock (WriteLock then monitor). + // If poll() is then called, we will synchronize on monitor, THEN attempt to obtain the write lock (monitor then WriteLock), + // which would cause a deadlock. + synchronized (monitor) { + super.inheritQueueContents(queueContents); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/ConnectionEventListener.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/ConnectionEventListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/ConnectionEventListener.java new file mode 100644 index 0000000..a3ae6ee --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/ConnectionEventListener.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +public interface ConnectionEventListener { + void triggerSourceEvent(); + + void triggerDestinationEvent(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileAction.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileAction.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileAction.java new file mode 100644 index 0000000..86cd169 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileAction.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +import java.io.IOException; +import java.util.List; + +import org.apache.nifi.controller.repository.FlowFileRecord; + +public interface DropFlowFileAction { + QueueSize drop(List<FlowFileRecord> flowFiles, String requestor) throws IOException; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRepositoryRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRepositoryRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRepositoryRecord.java new file mode 100644 index 0000000..f47b4eb --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRepositoryRecord.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +import java.util.Collections; +import java.util.List; + +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.RepositoryRecord; +import org.apache.nifi.controller.repository.RepositoryRecordType; +import org.apache.nifi.controller.repository.claim.ContentClaim; + +public class DropFlowFileRepositoryRecord implements RepositoryRecord { + private final FlowFileQueue queue; + private final FlowFileRecord flowFile; + + public DropFlowFileRepositoryRecord(final FlowFileQueue queue, final FlowFileRecord flowFile) { + this.queue = queue; + this.flowFile = flowFile; + } + + @Override + public FlowFileQueue getDestination() { + return null; + } + + @Override + public FlowFileQueue getOriginalQueue() { + return queue; + } + + @Override + public RepositoryRecordType getType() { + return RepositoryRecordType.DELETE; + } + + @Override + public ContentClaim getCurrentClaim() { + return flowFile.getContentClaim(); + } + + @Override + public ContentClaim getOriginalClaim() { + return flowFile.getContentClaim(); + } + + @Override + public long getCurrentClaimOffset() { + return flowFile.getContentClaimOffset(); + } + + @Override + public FlowFileRecord getCurrent() { + return flowFile; + } + + @Override + public boolean isAttributesChanged() { + return false; + } + + @Override + public boolean isMarkedForAbort() { + return false; + } + + @Override + public String getSwapLocation() { + return null; + } + + @Override + public List<ContentClaim> getTransientClaims() { + return Collections.emptyList(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueContents.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueContents.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueContents.java new file mode 100644 index 0000000..60ad64d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueContents.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +import org.apache.nifi.controller.repository.FlowFileRecord; + +import java.util.List; + +public class FlowFileQueueContents { + private final List<String> swapLocations; + private final List<FlowFileRecord> activeFlowFiles; + private final QueueSize swapSize; + + public FlowFileQueueContents(final List<FlowFileRecord> activeFlowFiles, final List<String> swapLocations, final QueueSize swapSize) { + this.activeFlowFiles = activeFlowFiles; + this.swapLocations = swapLocations; + this.swapSize = swapSize; + } + + public List<FlowFileRecord> getActiveFlowFiles() { + return activeFlowFiles; + } + + public List<String> getSwapLocations() { + return swapLocations; + } + + public QueueSize getSwapSize() { + return swapSize; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueFactory.java new file mode 100644 index 0000000..dc6667f --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueFactory.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +public interface FlowFileQueueFactory { + FlowFileQueue createFlowFileQueue(LoadBalanceStrategy loadBalanceStrategy, String partitioningAttribute, ConnectionEventListener connectionEventListener); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueSize.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueSize.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueSize.java new file mode 100644 index 0000000..7ebc017 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueSize.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +public class FlowFileQueueSize { + private final int activeQueueCount; + private final long activeQueueBytes; + private final int swappedCount; + private final long swappedBytes; + private final int swapFiles; + private final int unacknowledgedCount; + private final long unacknowledgedBytes; + + public FlowFileQueueSize(final int activeQueueCount, final long activeQueueBytes, final int swappedCount, final long swappedBytes, final int swapFileCount, + final int unacknowledgedCount, final long unacknowledgedBytes) { + this.activeQueueCount = activeQueueCount; + this.activeQueueBytes = activeQueueBytes; + this.swappedCount = swappedCount; + this.swappedBytes = swappedBytes; + this.swapFiles = swapFileCount; + this.unacknowledgedCount = unacknowledgedCount; + this.unacknowledgedBytes = unacknowledgedBytes; + } + + public int getSwappedCount() { + return swappedCount; + } + + public long getSwappedBytes() { + return swappedBytes; + } + + public int getSwapFileCount() { + return swapFiles; + } + + public int getActiveCount() { + return activeQueueCount; + } + + public long getActiveBytes() { + return activeQueueBytes; + } + + public int getUnacknowledgedCount() { + return unacknowledgedCount; + } + + public long getUnacknowledgedBytes() { + return unacknowledgedBytes; + } + + public boolean isEmpty() { + return activeQueueCount == 0 && swappedCount == 0 && unacknowledgedCount == 0; + } + + public QueueSize toQueueSize() { + return new QueueSize(activeQueueCount + swappedCount + unacknowledgedCount, activeQueueBytes + swappedBytes + unacknowledgedBytes); + } + + public QueueSize activeQueueSize() { + return new QueueSize(activeQueueCount, activeQueueBytes); + } + + public QueueSize unacknowledgedQueueSize() { + return new QueueSize(unacknowledgedCount, unacknowledgedBytes); + } + + public QueueSize swapQueueSize() { + return new QueueSize(swappedCount, swappedBytes); + } + + @Override + public String toString() { + return "FlowFile Queue Size[ ActiveQueue=[" + activeQueueCount + ", " + activeQueueBytes + + " Bytes], Swap Queue=[" + swappedCount + ", " + swappedBytes + + " Bytes], Swap Files=[" + swapFiles + "], Unacknowledged=[" + unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/MaxQueueSize.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/MaxQueueSize.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/MaxQueueSize.java new file mode 100644 index 0000000..9492435 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/MaxQueueSize.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +public class MaxQueueSize { + private final String maxSize; + private final long maxBytes; + private final long maxCount; + + public MaxQueueSize(final String maxSize, final long maxBytes, final long maxCount) { + this.maxSize = maxSize; + this.maxBytes = maxBytes; + this.maxCount = maxCount; + } + + public String getMaxSize() { + return maxSize; + } + + public long getMaxBytes() { + return maxBytes; + } + + public long getMaxCount() { + return maxCount; + } + + @Override + public String toString() { + return maxCount + " Objects/" + maxSize; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/NopConnectionEventListener.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/NopConnectionEventListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/NopConnectionEventListener.java new file mode 100644 index 0000000..d641da4 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/NopConnectionEventListener.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +public class NopConnectionEventListener implements ConnectionEventListener { + @Override + public void triggerSourceEvent() { + } + + @Override + public void triggerDestinationEvent() { + + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java new file mode 100644 index 0000000..b78ccff --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.flowfile.FlowFilePrioritizer; + +public class QueuePrioritizer implements Comparator<FlowFileRecord>, Serializable { + private static final long serialVersionUID = 1L; + private final transient List<FlowFilePrioritizer> prioritizers = new ArrayList<>(); + + public QueuePrioritizer(final List<FlowFilePrioritizer> priorities) { + if (null != priorities) { + prioritizers.addAll(priorities); + } + } + + @Override + public int compare(final FlowFileRecord f1, final FlowFileRecord f2) { + int returnVal = 0; + final boolean f1Penalized = f1.isPenalized(); + final boolean f2Penalized = f2.isPenalized(); + + if (f1Penalized && !f2Penalized) { + return 1; + } else if (!f1Penalized && f2Penalized) { + return -1; + } + + if (f1Penalized && f2Penalized) { + if (f1.getPenaltyExpirationMillis() < f2.getPenaltyExpirationMillis()) { + return -1; + } else if (f1.getPenaltyExpirationMillis() > f2.getPenaltyExpirationMillis()) { + return 1; + } + } + + if (!prioritizers.isEmpty()) { + for (final FlowFilePrioritizer prioritizer : prioritizers) { + returnVal = prioritizer.compare(f1, f2); + if (returnVal != 0) { + return returnVal; + } + } + } + + final ContentClaim claim1 = f1.getContentClaim(); + final ContentClaim claim2 = f2.getContentClaim(); + + // put the one without a claim first + if (claim1 == null && claim2 != null) { + return -1; + } else if (claim1 != null && claim2 == null) { + return 1; + } else if (claim1 != null && claim2 != null) { + final int claimComparison = claim1.compareTo(claim2); + if (claimComparison != 0) { + return claimComparison; + } + + final int claimOffsetComparison = Long.compare(f1.getContentClaimOffset(), f2.getContentClaimOffset()); + if (claimOffsetComparison != 0) { + return claimOffsetComparison; + } + } + + return Long.compare(f1.getId(), f2.getId()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java new file mode 100644 index 0000000..cab41e8 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.queue; + +import org.apache.nifi.controller.ProcessScheduler; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileRepository; +import org.apache.nifi.controller.repository.FlowFileSwapManager; +import org.apache.nifi.controller.repository.SwapSummary; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.processor.FlowFileFilter; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.util.concurrency.TimedLock; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * A FlowFileQueue is used to queue FlowFile objects that are awaiting further + * processing. Must be thread safe. + * + */ +public class StandardFlowFileQueue extends AbstractFlowFileQueue implements FlowFileQueue { + + private final SwappablePriorityQueue queue; + private final ConnectionEventListener eventListener; + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + private final FlowFileSwapManager swapManager; + private final TimedLock writeLock; + + + public StandardFlowFileQueue(final String identifier, final ConnectionEventListener eventListener, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo, + final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, + final int swapThreshold, final long defaultBackPressureObjectThreshold, final String defaultBackPressureDataSizeThreshold) { + + super(identifier, scheduler, flowFileRepo, provRepo, resourceClaimManager); + this.swapManager = swapManager; + this.queue = new SwappablePriorityQueue(swapManager, swapThreshold, eventReporter, this, this::drop, null); + this.eventListener = eventListener; + + writeLock = new TimedLock(this.lock.writeLock(), getIdentifier() + " Write Lock", 100); + + setBackPressureDataSizeThreshold(defaultBackPressureDataSizeThreshold); + setBackPressureObjectThreshold(defaultBackPressureObjectThreshold); + } + + @Override + public void startLoadBalancing() { + } + + @Override + public void stopLoadBalancing() { + } + + @Override + public boolean isActivelyLoadBalancing() { + return false; + } + + @Override + public void setPriorities(final List<FlowFilePrioritizer> newPriorities) { + queue.setPriorities(newPriorities); + } + + @Override + public List<FlowFilePrioritizer> getPriorities() { + return queue.getPriorities(); + } + + @Override + protected List<FlowFileRecord> getListableFlowFiles() { + return queue.getActiveFlowFiles(); + } + + @Override + public QueueDiagnostics getQueueDiagnostics() { + return new StandardQueueDiagnostics(queue.getQueueDiagnostics(), Collections.emptyList()); + } + + @Override + public void put(final FlowFileRecord file) { + queue.put(file); + + eventListener.triggerDestinationEvent(); + } + + @Override + public void putAll(final Collection<FlowFileRecord> files) { + queue.putAll(files); + + eventListener.triggerDestinationEvent(); + } + + + @Override + public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) { + // First check if we have any records Pre-Fetched. + final long expirationMillis = getFlowFileExpiration(TimeUnit.MILLISECONDS); + return queue.poll(expiredRecords, expirationMillis); + } + + + @Override + public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords) { + return queue.poll(maxResults, expiredRecords, getFlowFileExpiration(TimeUnit.MILLISECONDS)); + } + + + + @Override + public void acknowledge(final FlowFileRecord flowFile) { + queue.acknowledge(flowFile); + + eventListener.triggerSourceEvent(); + } + + @Override + public void acknowledge(final Collection<FlowFileRecord> flowFiles) { + queue.acknowledge(flowFiles); + + eventListener.triggerSourceEvent(); + } + + @Override + public boolean isUnacknowledgedFlowFile() { + return queue.isUnacknowledgedFlowFile(); + } + + @Override + public QueueSize size() { + return queue.size(); + } + + @Override + public boolean isEmpty() { + return queue.getFlowFileQueueSize().isEmpty(); + } + + @Override + public boolean isActiveQueueEmpty() { + final FlowFileQueueSize queueSize = queue.getFlowFileQueueSize(); + return queueSize.getActiveCount() == 0 && queueSize.getSwappedCount() == 0; + } + + @Override + public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) { + return queue.poll(filter, expiredRecords, getFlowFileExpiration(TimeUnit.MILLISECONDS)); + } + + @Override + public void purgeSwapFiles() { + swapManager.purge(); + } + + @Override + public SwapSummary recoverSwappedFlowFiles() { + return queue.recoverSwappedFlowFiles(); + } + + @Override + public String toString() { + return "FlowFileQueue[id=" + getIdentifier() + "]"; + } + + + @Override + public FlowFileRecord getFlowFile(final String flowFileUuid) throws IOException { + return queue.getFlowFile(flowFileUuid); + } + + + @Override + protected void dropFlowFiles(final DropFlowFileRequest dropRequest, final String requestor) { + queue.dropFlowFiles(dropRequest, requestor); + } + + + /** + * Lock the queue so that other threads are unable to interact with the queue + */ + public void lock() { + writeLock.lock(); + } + + /** + * Unlock the queue + */ + public void unlock() { + writeLock.unlock("external unlock"); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardLocalQueuePartitionDiagnostics.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardLocalQueuePartitionDiagnostics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardLocalQueuePartitionDiagnostics.java new file mode 100644 index 0000000..ff31e77 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardLocalQueuePartitionDiagnostics.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +public class StandardLocalQueuePartitionDiagnostics implements LocalQueuePartitionDiagnostics { + private final FlowFileQueueSize queueSize; + private final boolean anyPenalized; + private final boolean allPenalized; + + public StandardLocalQueuePartitionDiagnostics(final FlowFileQueueSize queueSize, final boolean anyPenalized, final boolean allPenalized) { + this.queueSize = queueSize; + this.anyPenalized = anyPenalized; + this.allPenalized = allPenalized; + } + + @Override + public QueueSize getUnacknowledgedQueueSize() { + return new QueueSize(queueSize.getUnacknowledgedCount(), queueSize.getUnacknowledgedCount()); + } + + @Override + public QueueSize getActiveQueueSize() { + return new QueueSize(queueSize.getActiveCount(), queueSize.getActiveBytes()); + } + + @Override + public QueueSize getSwapQueueSize() { + return new QueueSize(queueSize.getSwappedCount(), queueSize.getSwappedBytes()); + } + + @Override + public int getSwapFileCount() { + return queueSize.getSwapFileCount(); + } + + @Override + public boolean isAnyActiveFlowFilePenalized() { + return anyPenalized; + } + + @Override + public boolean isAllActiveFlowFilesPenalized() { + return allPenalized; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardQueueDiagnostics.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardQueueDiagnostics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardQueueDiagnostics.java new file mode 100644 index 0000000..be42e2e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardQueueDiagnostics.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +import java.util.List; + +public class StandardQueueDiagnostics implements QueueDiagnostics { + final LocalQueuePartitionDiagnostics localQueuePartitionDiagnostics; + final List<RemoteQueuePartitionDiagnostics> remoteQueuePartitionDiagnostics; + + public StandardQueueDiagnostics(final LocalQueuePartitionDiagnostics localQueuePartitionDiagnostics, final List<RemoteQueuePartitionDiagnostics> remoteQueuePartitionDiagnostics) { + this.localQueuePartitionDiagnostics = localQueuePartitionDiagnostics; + this.remoteQueuePartitionDiagnostics = remoteQueuePartitionDiagnostics; + } + + @Override + public LocalQueuePartitionDiagnostics getLocalQueuePartitionDiagnostics() { + return localQueuePartitionDiagnostics; + } + + @Override + public List<RemoteQueuePartitionDiagnostics> getRemoteQueuePartitionDiagnostics() { + return remoteQueuePartitionDiagnostics; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardRemoteQueuePartitionDiagnostics.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardRemoteQueuePartitionDiagnostics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardRemoteQueuePartitionDiagnostics.java new file mode 100644 index 0000000..6790055 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardRemoteQueuePartitionDiagnostics.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +public class StandardRemoteQueuePartitionDiagnostics implements RemoteQueuePartitionDiagnostics { + private final String nodeId; + private final FlowFileQueueSize queueSize; + + public StandardRemoteQueuePartitionDiagnostics(final String nodeId, final FlowFileQueueSize queueSize) { + this.nodeId = nodeId; + this.queueSize = queueSize; + } + + @Override + public String getNodeIdentifier() { + return nodeId; + } + + @Override + public QueueSize getUnacknowledgedQueueSize() { + return new QueueSize(queueSize.getUnacknowledgedCount(), queueSize.getUnacknowledgedCount()); + } + + @Override + public QueueSize getActiveQueueSize() { + return new QueueSize(queueSize.getActiveCount(), queueSize.getActiveBytes()); + } + + @Override + public QueueSize getSwapQueueSize() { + return new QueueSize(queueSize.getSwappedCount(), queueSize.getSwappedBytes()); + } + + @Override + public int getSwapFileCount() { + return queueSize.getSwapFileCount(); + } +}
