Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2947#discussion_r220975068
--- Diff:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
---
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.RepositoryRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractFlowFileQueue implements FlowFileQueue {
+ private static final Logger logger =
LoggerFactory.getLogger(AbstractFlowFileQueue.class);
+ private final String identifier;
+ private final FlowFileRepository flowFileRepository;
+ private final ProvenanceEventRepository provRepository;
+ private final ResourceClaimManager resourceClaimManager;
+ private final ProcessScheduler scheduler;
+
+ private final AtomicReference<TimePeriod> expirationPeriod = new
AtomicReference<>(new TimePeriod("0 mins", 0L));
+ private final AtomicReference<MaxQueueSize> maxQueueSize = new
AtomicReference<>(new MaxQueueSize("1 GB", 1024 * 1024 * 1024, 10000));
+
+ private final ConcurrentMap<String, ListFlowFileRequest>
listRequestMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, DropFlowFileRequest>
dropRequestMap = new ConcurrentHashMap<>();
+
+ private LoadBalanceStrategy loadBalanceStrategy =
LoadBalanceStrategy.DO_NOT_LOAD_BALANCE;
+ private String partitioningAttribute = null;
+
+ private LoadBalanceCompression compression =
LoadBalanceCompression.DO_NOT_COMPRESS;
+
+
+ public AbstractFlowFileQueue(final String identifier, final
ProcessScheduler scheduler,
+ final FlowFileRepository flowFileRepo, final
ProvenanceEventRepository provRepo, final ResourceClaimManager
resourceClaimManager) {
+ this.identifier = identifier;
+ this.scheduler = scheduler;
+ this.flowFileRepository = flowFileRepo;
+ this.provRepository = provRepo;
+ this.resourceClaimManager = resourceClaimManager;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ protected ProcessScheduler getScheduler() {
+ return scheduler;
+ }
+
+ @Override
+ public String getFlowFileExpiration() {
+ return expirationPeriod.get().getPeriod();
+ }
+
+ @Override
+ public int getFlowFileExpiration(final TimeUnit timeUnit) {
+ return (int) timeUnit.convert(expirationPeriod.get().getMillis(),
TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void setFlowFileExpiration(final String flowExpirationPeriod) {
+ final long millis =
FormatUtils.getTimeDuration(flowExpirationPeriod, TimeUnit.MILLISECONDS);
+ if (millis < 0) {
+ throw new IllegalArgumentException("FlowFile Expiration Period
must be positive");
+ }
+
+ expirationPeriod.set(new TimePeriod(flowExpirationPeriod, millis));
+ }
+
+ @Override
+ public void setBackPressureObjectThreshold(final long threshold) {
+ boolean updated = false;
+ while (!updated) {
+ MaxQueueSize maxSize = getMaxQueueSize();
+ final MaxQueueSize updatedSize = new
MaxQueueSize(maxSize.getMaxSize(), maxSize.getMaxBytes(), threshold);
+ updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
+ }
+ }
+
+ @Override
+ public long getBackPressureObjectThreshold() {
+ return getMaxQueueSize().getMaxCount();
+ }
+
+ @Override
+ public void setBackPressureDataSizeThreshold(final String maxDataSize)
{
+ final long maxBytes = DataUnit.parseDataSize(maxDataSize,
DataUnit.B).longValue();
+
+ boolean updated = false;
+ while (!updated) {
+ MaxQueueSize maxSize = getMaxQueueSize();
+ final MaxQueueSize updatedSize = new MaxQueueSize(maxDataSize,
maxBytes, maxSize.getMaxCount());
+ updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
+ }
+ }
+
+ @Override
+ public String getBackPressureDataSizeThreshold() {
+ return getMaxQueueSize().getMaxSize();
+ }
+
+ private MaxQueueSize getMaxQueueSize() {
+ return maxQueueSize.get();
+ }
+
+ @Override
+ public boolean isFull() {
+ final MaxQueueSize maxSize = getMaxQueueSize();
+
+ // Check if max size is set
+ if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) {
+ return false;
+ }
+
+ final QueueSize queueSize = size();
+ if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >=
maxSize.getMaxCount()) {
+ return true;
+ }
+
+ if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >=
maxSize.getMaxBytes()) {
+ return true;
+ }
+
+ return false;
+ }
+
+
+ @Override
+ public ListFlowFileStatus listFlowFiles(final String
requestIdentifier, final int maxResults) {
+ // purge any old requests from the map just to keep it clean. But
if there are very few requests, which is usually the case, then don't bother
+ if (listRequestMap.size() > 10) {
+ final List<String> toDrop = new ArrayList<>();
+ for (final Map.Entry<String, ListFlowFileRequest> entry :
listRequestMap.entrySet()) {
+ final ListFlowFileRequest request = entry.getValue();
+ final boolean completed = request.getState() ==
ListFlowFileState.COMPLETE || request.getState() == ListFlowFileState.FAILURE;
+
+ if (completed && System.currentTimeMillis() -
request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
+ toDrop.add(entry.getKey());
+ }
+ }
+
+ for (final String requestId : toDrop) {
+ listRequestMap.remove(requestId);
+ }
+ }
+
+ // numSteps = 1 for each swap location + 1 for active queue + 1
for swap queue.
+ final ListFlowFileRequest listRequest = new
ListFlowFileRequest(requestIdentifier, maxResults, size());
+
+ final Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ int position = 0;
+ int resultCount = 0;
+ final List<FlowFileSummary> summaries = new ArrayList<>();
+
+ // Create an ArrayList that contains all of the contents
of the active queue.
+ // We do this so that we don't have to hold the lock any
longer than absolutely necessary.
+ // We cannot simply pull the first 'maxResults' records
from the queue, however, because the
+ // Iterator provided by PriorityQueue does not return
records in order. So we would have to either
+ // use a writeLock and 'pop' the first 'maxResults'
records off the queue or use a read lock and
+ // do a shallow copy of the queue. The shallow copy is
generally quicker because it doesn't have to do
+ // the sorting to put the records back. So even though
this has an expensive of Java Heap to create the
+ // extra collection, we are making this trade-off to avoid
locking the queue any longer than required.
+ final List<FlowFileRecord> allFlowFiles =
getListableFlowFiles();
+ final QueuePrioritizer prioritizer = new
QueuePrioritizer(getPriorities());
+
+ listRequest.setState(ListFlowFileState.CALCULATING_LIST);
+
+ // sort the FlowFileRecords so that we have the list in
the same order as on the queue.
+ allFlowFiles.sort(prioritizer);
+
+ for (final FlowFileRecord flowFile : allFlowFiles) {
+ summaries.add(summarize(flowFile, ++position));
+ if (summaries.size() >= maxResults) {
+ break;
+ }
+ }
+
+ logger.debug("{} Finished listing FlowFiles for active
queue with a total of {} results", this, resultCount);
+ listRequest.setFlowFileSummaries(summaries);
+ listRequest.setState(ListFlowFileState.COMPLETE);
+ }
+ }, "List FlowFiles for Connection " + getIdentifier());
+ t.setDaemon(true);
+ t.start();
+
+ listRequestMap.put(requestIdentifier, listRequest);
+ return listRequest;
+ }
+
+ @Override
+ public ListFlowFileStatus getListFlowFileStatus(final String
requestIdentifier) {
+ return listRequestMap.get(requestIdentifier);
+ }
+
+ @Override
+ public ListFlowFileStatus cancelListFlowFileRequest(final String
requestIdentifier) {
+ logger.info("Canceling ListFlowFile Request with ID {}",
requestIdentifier);
+ final ListFlowFileRequest request =
listRequestMap.remove(requestIdentifier);
+ if (request != null) {
+ request.cancel();
+ }
+
+ return request;
+ }
+
+ /**
+ * @return all FlowFiles that should be listed in response to a List
Queue request
+ */
+ protected abstract List<FlowFileRecord> getListableFlowFiles();
+
+
+ @Override
+ public DropFlowFileStatus dropFlowFiles(final String
requestIdentifier, final String requestor) {
+ logger.info("Initiating drop of FlowFiles from {} on behalf of {}
(request identifier={})", this, requestor, requestIdentifier);
+
+ // purge any old requests from the map just to keep it clean. But
if there are very requests, which is usually the case, then don't bother
+ if (dropRequestMap.size() > 10) {
+ final List<String> toDrop = new ArrayList<>();
+ for (final Map.Entry<String, DropFlowFileRequest> entry :
dropRequestMap.entrySet()) {
+ final DropFlowFileRequest request = entry.getValue();
+ final boolean completed = request.getState() ==
DropFlowFileState.COMPLETE || request.getState() == DropFlowFileState.FAILURE;
+
+ if (completed && System.currentTimeMillis() -
request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
+ toDrop.add(entry.getKey());
+ }
+ }
+
+ for (final String requestId : toDrop) {
+ dropRequestMap.remove(requestId);
+ }
+ }
+
+ final DropFlowFileRequest dropRequest = new
DropFlowFileRequest(requestIdentifier);
+ final QueueSize originalSize = size();
+ dropRequest.setCurrentSize(originalSize);
+ dropRequest.setOriginalSize(originalSize);
+ if (originalSize.getObjectCount() == 0) {
+ dropRequest.setDroppedSize(originalSize);
+ dropRequest.setState(DropFlowFileState.COMPLETE);
+ dropRequestMap.put(requestIdentifier, dropRequest);
+ return dropRequest;
+ }
+
+ final Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ dropFlowFiles(dropRequest, requestor);
+ }
+ }, "Drop FlowFiles for Connection " + getIdentifier());
+ t.setDaemon(true);
+ t.start();
+
+ dropRequestMap.put(requestIdentifier, dropRequest);
+
+ return dropRequest;
+ }
+
+
+ @Override
+ public DropFlowFileRequest cancelDropFlowFileRequest(final String
requestIdentifier) {
+ final DropFlowFileRequest request =
dropRequestMap.remove(requestIdentifier);
+ if (request == null) {
+ return null;
+ }
+
+ request.cancel();
+ return request;
+ }
+
+ @Override
+ public DropFlowFileStatus getDropFlowFileStatus(final String
requestIdentifier) {
+ return dropRequestMap.get(requestIdentifier);
+ }
+
+ /**
+ * Synchronously drops all FlowFiles in the queue
+ *
+ * @param dropRequest the request
+ * @param requestor the identity of the user/agent who made the request
+ */
+ protected abstract void dropFlowFiles(final DropFlowFileRequest
dropRequest, final String requestor);
+
+ @Override
+ public void verifyCanList() throws IllegalStateException {
+ }
+
+
+ protected FlowFileSummary summarize(final FlowFile flowFile, final int
position) {
+ // extract all of the information that we care about into new
variables rather than just
+ // wrapping the FlowFile object with a FlowFileSummary object. We
do this because we want to
+ // be able to hold many FlowFileSummary objects in memory and if
we just wrap the FlowFile object,
+ // we will end up holding the entire FlowFile (including all
Attributes) in the Java heap as well,
+ // which can be problematic if we expect them to be swapped out.
+ final String uuid =
flowFile.getAttribute(CoreAttributes.UUID.key());
+ final String filename =
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+ final long size = flowFile.getSize();
+ final Long lastQueuedTime = flowFile.getLastQueueDate();
+ final long lineageStart = flowFile.getLineageStartDate();
+ final boolean penalized = flowFile.isPenalized();
+
+ return new FlowFileSummary() {
+ @Override
+ public String getUuid() {
+ return uuid;
+ }
+
+ @Override
+ public String getFilename() {
+ return filename;
+ }
+
+ @Override
+ public int getPosition() {
+ return position;
+ }
+
+ @Override
+ public long getSize() {
+ return size;
+ }
+
+ @Override
+ public long getLastQueuedTime() {
+ return lastQueuedTime == null ? 0L : lastQueuedTime;
+ }
+
+ @Override
+ public long getLineageStartDate() {
+ return lineageStart;
+ }
+
+ @Override
+ public boolean isPenalized() {
+ return penalized;
+ }
+ };
+ }
+
+ protected QueueSize drop(final List<FlowFileRecord> flowFiles, final
String requestor) throws IOException {
+ // Create a Provenance Event and a FlowFile Repository record for
each FlowFile
+ final List<ProvenanceEventRecord> provenanceEvents = new
ArrayList<>(flowFiles.size());
+ final List<RepositoryRecord> flowFileRepoRecords = new
ArrayList<>(flowFiles.size());
+ for (final FlowFileRecord flowFile : flowFiles) {
+ provenanceEvents.add(createDropProvenanceEvent(flowFile,
requestor));
+
flowFileRepoRecords.add(createDeleteRepositoryRecord(flowFile));
+ }
+
+ long dropContentSize = 0L;
+ for (final FlowFileRecord flowFile : flowFiles) {
+ dropContentSize += flowFile.getSize();
+ final ContentClaim contentClaim = flowFile.getContentClaim();
+ if (contentClaim == null) {
+ continue;
+ }
+
+ final ResourceClaim resourceClaim =
contentClaim.getResourceClaim();
+ if (resourceClaim == null) {
+ continue;
+ }
+
+ resourceClaimManager.decrementClaimantCount(resourceClaim);
+ }
+
+ provRepository.registerEvents(provenanceEvents);
+ flowFileRepository.updateRepository(flowFileRepoRecords);
+ return new QueueSize(flowFiles.size(), dropContentSize);
+ }
+
+ private ProvenanceEventRecord createDropProvenanceEvent(final
FlowFileRecord flowFile, final String requestor) {
+ final ProvenanceEventBuilder builder =
provRepository.eventBuilder();
+ builder.fromFlowFile(flowFile);
+ builder.setEventType(ProvenanceEventType.DROP);
+ builder.setLineageStartDate(flowFile.getLineageStartDate());
+ builder.setComponentId(getIdentifier());
+ builder.setComponentType("Connection");
+ builder.setAttributes(flowFile.getAttributes(),
Collections.<String, String> emptyMap());
+ builder.setDetails("FlowFile Queue emptied by " + requestor);
+ builder.setSourceQueueIdentifier(getIdentifier());
+
+ final ContentClaim contentClaim = flowFile.getContentClaim();
+ if (contentClaim != null) {
+ final ResourceClaim resourceClaim =
contentClaim.getResourceClaim();
+ builder.setPreviousContentClaim(resourceClaim.getContainer(),
resourceClaim.getSection(), resourceClaim.getId(), contentClaim.getOffset(),
flowFile.getSize());
+ }
+
+ return builder.build();
+ }
+
+ private RepositoryRecord createDeleteRepositoryRecord(final
FlowFileRecord flowFile) {
+ return new DropFlowFileRepositoryRecord(this, flowFile);
+ }
+
+ @Override
+ public synchronized void setLoadBalanceStrategy(final
LoadBalanceStrategy strategy, final String partitioningAttribute) {
+ if (strategy == LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE &&
partitioningAttribute == null) {
--- End diff --
I'm ok with the ` && !partitioningAttribute.isEmpty()` - but really if
partitioning attribute is null or empty and strategy ==
LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE, we probably should throw an
Exception instead.
Re: nullifying this.partitioningAttribute and reseting compression - I
disagree. If a user is using Partition By Attribute, then changes it to Round
Robin on accident, for instance, and wants to change it back, we should keep
the Partitioning Attribute. Or said another way, we should not assume that the
user wants the field to be cleared. Similarly, if the user is load balancing,
then decides to turn off load balancing (for example to see a difference in
performance) we should not reset their configuration. If they want to set it
back, they would like (at least I would) expect the previously configured
compression value to remain.
---