markap14 commented on a change in pull request #4780:
URL: https://github.com/apache/nifi/pull/4780#discussion_r578651523



##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
##########
@@ -920,7 +921,14 @@ public SwapSummary recoverSwappedFlowFiles() {
         return new StandardSwapSummary(new QueueSize(swapFlowFileCount, 
swapByteCount), maxId, resourceClaims);
     }
 
+    public long getMaxActiveQueuedDuration(long fromTimestamp) {
+        // We want the oldest timestamp, which will be the min
+        return fromTimestamp - 
activeQueue.parallelStream().map(FlowFile::getLastQueueDate).filter(Objects::nonNull).min(Long::compareTo).orElse(fromTimestamp);

Review comment:
       Unfortunately, these numbers are not accurate if any FlowFiles have been 
swapped out. To get the max active queued duration, we would have to also 
consider any data in swap files. To do that efficiently, before swapping data 
out, the oldest timestamp would need to be written to the swap file header. 
Even scanning the header, though, would be too expensive to invoke regularly, 
so it would be important that this class also keep a mapping of swap file to 
earliest timestamp, and recover that on startup when recovering swapped data.
   
   Additionally, we cannot access the `activeQueue` outside of a Read/Write 
lock, so that would need to be added in here for appropriate thread safety. 
Would also say that we should avoid any parallel streams here. The act of 
creating the parallel stream is almost guaranteed to be far more expensive than 
the cost of just iterating over all of the elements in the queue using a 
for-each.

##########
File path: 
nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
##########
@@ -93,6 +93,18 @@
 
     QueueSize size();
 
+    /**
+     * @param fromTimestamp The timestamp in milliseconds from which to 
calculate durations. This will typically be the current timestamp.
+     * @return the sum in milliseconds of how long all FlowFiles within this 
queue have currently been in this queue.
+     */
+    long getTotalActiveQueuedDuration(long fromTimestamp);

Review comment:
       I'm not really sure what exactly this metric is telling us. How would 
this metric be used? This would also become very expensive to calculate with 
data that is swapped out...

##########
File path: 
nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
##########
@@ -93,6 +93,18 @@
 
     QueueSize size();
 
+    /**
+     * @param fromTimestamp The timestamp in milliseconds from which to 
calculate durations. This will typically be the current timestamp.
+     * @return the sum in milliseconds of how long all FlowFiles within this 
queue have currently been in this queue.
+     */
+    long getTotalActiveQueuedDuration(long fromTimestamp);
+
+    /**
+     * @param fromTimestamp The timestamp in milliseconds from which to 
calculate durations. This will typically be the current timestamp.
+     * @return the maximum time in milliseconds that any FlowFile within this 
queue has been in this queue.
+     */
+    long getMaxActiveQueuedDuration(long fromTimestamp);

Review comment:
       I'm not sure what exactly it means for the max active queued duration to 
take into account a timestamp... it makes a lot more sense to me to accept no 
arguments here at all, and to instead just use the current time. Or, 
alternatively, to return the timestamp of the longest-queued FlowFile.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to