Github user ijokarumawak commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2947#discussion_r219366959
--- Diff:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
---
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.RepositoryRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractFlowFileQueue implements FlowFileQueue {
+ private static final Logger logger =
LoggerFactory.getLogger(AbstractFlowFileQueue.class);
+ private final String identifier;
+ private final FlowFileRepository flowFileRepository;
+ private final ProvenanceEventRepository provRepository;
+ private final ResourceClaimManager resourceClaimManager;
+ private final ProcessScheduler scheduler;
+
+ private final AtomicReference<TimePeriod> expirationPeriod = new
AtomicReference<>(new TimePeriod("0 mins", 0L));
+ private final AtomicReference<MaxQueueSize> maxQueueSize = new
AtomicReference<>(new MaxQueueSize("1 GB", 1024 * 1024 * 1024, 10000));
+
+ private final ConcurrentMap<String, ListFlowFileRequest>
listRequestMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, DropFlowFileRequest>
dropRequestMap = new ConcurrentHashMap<>();
+
+ private LoadBalanceStrategy loadBalanceStrategy =
LoadBalanceStrategy.DO_NOT_LOAD_BALANCE;
+ private String partitioningAttribute = null;
+
+ private LoadBalanceCompression compression =
LoadBalanceCompression.DO_NOT_COMPRESS;
+
+
+ public AbstractFlowFileQueue(final String identifier, final
ProcessScheduler scheduler,
+ final FlowFileRepository flowFileRepo, final
ProvenanceEventRepository provRepo, final ResourceClaimManager
resourceClaimManager) {
+ this.identifier = identifier;
+ this.scheduler = scheduler;
+ this.flowFileRepository = flowFileRepo;
+ this.provRepository = provRepo;
+ this.resourceClaimManager = resourceClaimManager;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ protected ProcessScheduler getScheduler() {
+ return scheduler;
+ }
+
+ @Override
+ public String getFlowFileExpiration() {
+ return expirationPeriod.get().getPeriod();
+ }
+
+ @Override
+ public int getFlowFileExpiration(final TimeUnit timeUnit) {
+ return (int) timeUnit.convert(expirationPeriod.get().getMillis(),
TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void setFlowFileExpiration(final String flowExpirationPeriod) {
+ final long millis =
FormatUtils.getTimeDuration(flowExpirationPeriod, TimeUnit.MILLISECONDS);
+ if (millis < 0) {
+ throw new IllegalArgumentException("FlowFile Expiration Period
must be positive");
+ }
+
+ expirationPeriod.set(new TimePeriod(flowExpirationPeriod, millis));
+ }
+
+ @Override
+ public void setBackPressureObjectThreshold(final long threshold) {
+ boolean updated = false;
+ while (!updated) {
+ MaxQueueSize maxSize = getMaxQueueSize();
+ final MaxQueueSize updatedSize = new
MaxQueueSize(maxSize.getMaxSize(), maxSize.getMaxBytes(), threshold);
+ updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
+ }
+ }
+
+ @Override
+ public long getBackPressureObjectThreshold() {
+ return getMaxQueueSize().getMaxCount();
+ }
+
+ @Override
+ public void setBackPressureDataSizeThreshold(final String maxDataSize)
{
+ final long maxBytes = DataUnit.parseDataSize(maxDataSize,
DataUnit.B).longValue();
+
+ boolean updated = false;
+ while (!updated) {
+ MaxQueueSize maxSize = getMaxQueueSize();
+ final MaxQueueSize updatedSize = new MaxQueueSize(maxDataSize,
maxBytes, maxSize.getMaxCount());
+ updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
+ }
+ }
+
+ @Override
+ public String getBackPressureDataSizeThreshold() {
+ return getMaxQueueSize().getMaxSize();
+ }
+
+ private MaxQueueSize getMaxQueueSize() {
+ return maxQueueSize.get();
+ }
+
+ @Override
+ public boolean isFull() {
+ final MaxQueueSize maxSize = getMaxQueueSize();
+
+ // Check if max size is set
+ if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) {
+ return false;
+ }
+
+ final QueueSize queueSize = size();
+ if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >=
maxSize.getMaxCount()) {
+ return true;
+ }
+
+ if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >=
maxSize.getMaxBytes()) {
+ return true;
+ }
+
+ return false;
+ }
+
+
+ @Override
+ public ListFlowFileStatus listFlowFiles(final String
requestIdentifier, final int maxResults) {
+ // purge any old requests from the map just to keep it clean. But
if there are very few requests, which is usually the case, then don't bother
+ if (listRequestMap.size() > 10) {
+ final List<String> toDrop = new ArrayList<>();
+ for (final Map.Entry<String, ListFlowFileRequest> entry :
listRequestMap.entrySet()) {
+ final ListFlowFileRequest request = entry.getValue();
+ final boolean completed = request.getState() ==
ListFlowFileState.COMPLETE || request.getState() == ListFlowFileState.FAILURE;
+
+ if (completed && System.currentTimeMillis() -
request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
+ toDrop.add(entry.getKey());
+ }
+ }
+
+ for (final String requestId : toDrop) {
+ listRequestMap.remove(requestId);
+ }
+ }
+
+ // numSteps = 1 for each swap location + 1 for active queue + 1
for swap queue.
+ final ListFlowFileRequest listRequest = new
ListFlowFileRequest(requestIdentifier, maxResults, size());
+
+ final Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ int position = 0;
+ int resultCount = 0;
+ final List<FlowFileSummary> summaries = new ArrayList<>();
+
+ // Create an ArrayList that contains all of the contents
of the active queue.
+ // We do this so that we don't have to hold the lock any
longer than absolutely necessary.
+ // We cannot simply pull the first 'maxResults' records
from the queue, however, because the
+ // Iterator provided by PriorityQueue does not return
records in order. So we would have to either
+ // use a writeLock and 'pop' the first 'maxResults'
records off the queue or use a read lock and
+ // do a shallow copy of the queue. The shallow copy is
generally quicker because it doesn't have to do
+ // the sorting to put the records back. So even though
this has an expensive of Java Heap to create the
+ // extra collection, we are making this trade-off to avoid
locking the queue any longer than required.
+ final List<FlowFileRecord> allFlowFiles =
getListableFlowFiles();
+ final QueuePrioritizer prioritizer = new
QueuePrioritizer(getPriorities());
+
+ listRequest.setState(ListFlowFileState.CALCULATING_LIST);
+
+ // sort the FlowFileRecords so that we have the list in
the same order as on the queue.
+ allFlowFiles.sort(prioritizer);
+
+ for (final FlowFileRecord flowFile : allFlowFiles) {
+ summaries.add(summarize(flowFile, ++position));
+ if (summaries.size() >= maxResults) {
+ break;
+ }
+ }
+
+ logger.debug("{} Finished listing FlowFiles for active
queue with a total of {} results", this, resultCount);
+ listRequest.setFlowFileSummaries(summaries);
+ listRequest.setState(ListFlowFileState.COMPLETE);
+ }
+ }, "List FlowFiles for Connection " + getIdentifier());
+ t.setDaemon(true);
+ t.start();
+
+ listRequestMap.put(requestIdentifier, listRequest);
+ return listRequest;
+ }
+
+ @Override
+ public ListFlowFileStatus getListFlowFileStatus(final String
requestIdentifier) {
+ return listRequestMap.get(requestIdentifier);
+ }
+
+ @Override
+ public ListFlowFileStatus cancelListFlowFileRequest(final String
requestIdentifier) {
+ logger.info("Canceling ListFlowFile Request with ID {}",
requestIdentifier);
+ final ListFlowFileRequest request =
listRequestMap.remove(requestIdentifier);
+ if (request != null) {
+ request.cancel();
+ }
+
+ return request;
+ }
+
+ /**
+ * @return all FlowFiles that should be listed in response to a List
Queue request
+ */
+ protected abstract List<FlowFileRecord> getListableFlowFiles();
+
+
+ @Override
+ public DropFlowFileStatus dropFlowFiles(final String
requestIdentifier, final String requestor) {
+ logger.info("Initiating drop of FlowFiles from {} on behalf of {}
(request identifier={})", this, requestor, requestIdentifier);
+
+ // purge any old requests from the map just to keep it clean. But
if there are very requests, which is usually the case, then don't bother
+ if (dropRequestMap.size() > 10) {
+ final List<String> toDrop = new ArrayList<>();
+ for (final Map.Entry<String, DropFlowFileRequest> entry :
dropRequestMap.entrySet()) {
+ final DropFlowFileRequest request = entry.getValue();
+ final boolean completed = request.getState() ==
DropFlowFileState.COMPLETE || request.getState() == DropFlowFileState.FAILURE;
+
+ if (completed && System.currentTimeMillis() -
request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
+ toDrop.add(entry.getKey());
+ }
+ }
+
+ for (final String requestId : toDrop) {
+ dropRequestMap.remove(requestId);
+ }
+ }
+
+ final DropFlowFileRequest dropRequest = new
DropFlowFileRequest(requestIdentifier);
+ final QueueSize originalSize = size();
+ dropRequest.setCurrentSize(originalSize);
+ dropRequest.setOriginalSize(originalSize);
+ if (originalSize.getObjectCount() == 0) {
+ dropRequest.setDroppedSize(originalSize);
+ dropRequest.setState(DropFlowFileState.COMPLETE);
+ dropRequestMap.put(requestIdentifier, dropRequest);
+ return dropRequest;
+ }
+
+ final Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ dropFlowFiles(dropRequest, requestor);
+ }
+ }, "Drop FlowFiles for Connection " + getIdentifier());
+ t.setDaemon(true);
+ t.start();
+
+ dropRequestMap.put(requestIdentifier, dropRequest);
+
+ return dropRequest;
+ }
+
+
+ @Override
+ public DropFlowFileRequest cancelDropFlowFileRequest(final String
requestIdentifier) {
+ final DropFlowFileRequest request =
dropRequestMap.remove(requestIdentifier);
+ if (request == null) {
+ return null;
+ }
+
+ request.cancel();
+ return request;
+ }
+
+ @Override
+ public DropFlowFileStatus getDropFlowFileStatus(final String
requestIdentifier) {
+ return dropRequestMap.get(requestIdentifier);
+ }
+
+ /**
+ * Synchronously drops all FlowFiles in the queue
+ *
+ * @param dropRequest the request
+ * @param requestor the identity of the user/agent who made the request
+ */
+ protected abstract void dropFlowFiles(final DropFlowFileRequest
dropRequest, final String requestor);
+
+ @Override
+ public void verifyCanList() throws IllegalStateException {
+ }
+
+
+ protected FlowFileSummary summarize(final FlowFile flowFile, final int
position) {
+ // extract all of the information that we care about into new
variables rather than just
+ // wrapping the FlowFile object with a FlowFileSummary object. We
do this because we want to
+ // be able to hold many FlowFileSummary objects in memory and if
we just wrap the FlowFile object,
+ // we will end up holding the entire FlowFile (including all
Attributes) in the Java heap as well,
+ // which can be problematic if we expect them to be swapped out.
+ final String uuid =
flowFile.getAttribute(CoreAttributes.UUID.key());
+ final String filename =
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+ final long size = flowFile.getSize();
+ final Long lastQueuedTime = flowFile.getLastQueueDate();
+ final long lineageStart = flowFile.getLineageStartDate();
+ final boolean penalized = flowFile.isPenalized();
+
+ return new FlowFileSummary() {
+ @Override
+ public String getUuid() {
+ return uuid;
+ }
+
+ @Override
+ public String getFilename() {
+ return filename;
+ }
+
+ @Override
+ public int getPosition() {
+ return position;
+ }
+
+ @Override
+ public long getSize() {
+ return size;
+ }
+
+ @Override
+ public long getLastQueuedTime() {
+ return lastQueuedTime == null ? 0L : lastQueuedTime;
+ }
+
+ @Override
+ public long getLineageStartDate() {
+ return lineageStart;
+ }
+
+ @Override
+ public boolean isPenalized() {
+ return penalized;
+ }
+ };
+ }
+
+ protected QueueSize drop(final List<FlowFileRecord> flowFiles, final
String requestor) throws IOException {
+ // Create a Provenance Event and a FlowFile Repository record for
each FlowFile
+ final List<ProvenanceEventRecord> provenanceEvents = new
ArrayList<>(flowFiles.size());
+ final List<RepositoryRecord> flowFileRepoRecords = new
ArrayList<>(flowFiles.size());
+ for (final FlowFileRecord flowFile : flowFiles) {
+ provenanceEvents.add(createDropProvenanceEvent(flowFile,
requestor));
+
flowFileRepoRecords.add(createDeleteRepositoryRecord(flowFile));
+ }
+
+ long dropContentSize = 0L;
+ for (final FlowFileRecord flowFile : flowFiles) {
+ dropContentSize += flowFile.getSize();
+ final ContentClaim contentClaim = flowFile.getContentClaim();
+ if (contentClaim == null) {
+ continue;
+ }
+
+ final ResourceClaim resourceClaim =
contentClaim.getResourceClaim();
+ if (resourceClaim == null) {
+ continue;
+ }
+
+ resourceClaimManager.decrementClaimantCount(resourceClaim);
+ }
+
+ provRepository.registerEvents(provenanceEvents);
+ flowFileRepository.updateRepository(flowFileRepoRecords);
+ return new QueueSize(flowFiles.size(), dropContentSize);
+ }
+
+ private ProvenanceEventRecord createDropProvenanceEvent(final
FlowFileRecord flowFile, final String requestor) {
+ final ProvenanceEventBuilder builder =
provRepository.eventBuilder();
+ builder.fromFlowFile(flowFile);
+ builder.setEventType(ProvenanceEventType.DROP);
+ builder.setLineageStartDate(flowFile.getLineageStartDate());
+ builder.setComponentId(getIdentifier());
+ builder.setComponentType("Connection");
+ builder.setAttributes(flowFile.getAttributes(),
Collections.<String, String> emptyMap());
+ builder.setDetails("FlowFile Queue emptied by " + requestor);
+ builder.setSourceQueueIdentifier(getIdentifier());
+
+ final ContentClaim contentClaim = flowFile.getContentClaim();
+ if (contentClaim != null) {
+ final ResourceClaim resourceClaim =
contentClaim.getResourceClaim();
+ builder.setPreviousContentClaim(resourceClaim.getContainer(),
resourceClaim.getSection(), resourceClaim.getId(), contentClaim.getOffset(),
flowFile.getSize());
+ }
+
+ return builder.build();
+ }
+
+ private RepositoryRecord createDeleteRepositoryRecord(final
FlowFileRecord flowFile) {
+ return new DropFlowFileRepositoryRecord(this, flowFile);
+ }
+
+ @Override
+ public synchronized void setLoadBalanceStrategy(final
LoadBalanceStrategy strategy, final String partitioningAttribute) {
+ if (strategy == LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE &&
partitioningAttribute == null) {
--- End diff --
To make the instance variables consistent,
- We should add `&& !partitioningAttribute.isEmptry()` as well
- Also we should nullify this.partitioningAttribute if strategy is not
PARTITION_BY_ATTRIBUTE
- Also we should reset this.compression to DO_NOT_COMPRESS if strategy is
DO_NOT_LOAD_BALANCE
---