markap14 commented on a change in pull request #4780:
URL: https://github.com/apache/nifi/pull/4780#discussion_r578651523
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
##########
@@ -920,7 +921,14 @@ public SwapSummary recoverSwappedFlowFiles() {
return new StandardSwapSummary(new QueueSize(swapFlowFileCount,
swapByteCount), maxId, resourceClaims);
}
+ public long getMaxActiveQueuedDuration(long fromTimestamp) {
+ // We want the oldest timestamp, which will be the min
+ return fromTimestamp -
activeQueue.parallelStream().map(FlowFile::getLastQueueDate).filter(Objects::nonNull).min(Long::compareTo).orElse(fromTimestamp);
Review comment:
Unfortunately, these numbers are not accurate if any FlowFiles have been
swapped out. To get the max active queued duration, we would have to also
consider any data in swap files. To do that efficiently, before swapping data
out, the oldest timestamp would need to be written to the swap file header.
Even scanning the header, though, would be too expensive to invoke regularly,
so it would be important that this class also keep a mapping of swap file to
earliest timestamp, and recover that on startup when recovering swapped data.
Additionally, we cannot access the `activeQueue` outside of a Read/Write
lock, so that would need to be added in here for appropriate thread safety.
Would also say that we should avoid any parallel streams here. The act of
creating the parallel stream is almost guaranteed to be far more expensive than
the cost of just iterating over all of the elements in the queue using a
for-each.
##########
File path:
nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
##########
@@ -93,6 +93,18 @@
QueueSize size();
+ /**
+ * @param fromTimestamp The timestamp in milliseconds from which to
calculate durations. This will typically be the current timestamp.
+ * @return the sum in milliseconds of how long all FlowFiles within this
queue have currently been in this queue.
+ */
+ long getTotalActiveQueuedDuration(long fromTimestamp);
Review comment:
I'm not really sure what exactly this metric is telling us. How would
this metric be used? This would also become very expensive to calculate with
data that is swapped out...
##########
File path:
nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
##########
@@ -93,6 +93,18 @@
QueueSize size();
+ /**
+ * @param fromTimestamp The timestamp in milliseconds from which to
calculate durations. This will typically be the current timestamp.
+ * @return the sum in milliseconds of how long all FlowFiles within this
queue have currently been in this queue.
+ */
+ long getTotalActiveQueuedDuration(long fromTimestamp);
+
+ /**
+ * @param fromTimestamp The timestamp in milliseconds from which to
calculate durations. This will typically be the current timestamp.
+ * @return the maximum time in milliseconds that any FlowFile within this
queue has been in this queue.
+ */
+ long getMaxActiveQueuedDuration(long fromTimestamp);
Review comment:
I'm not sure what exactly it means for the max active queued duration to
take into account a timestamp... it makes a lot more sense to me to accept no
arguments here at all, and to instead just use the current time. Or,
alternatively, to return the timestamp of the longest-queued FlowFile.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]