markap14 commented on a change in pull request #4780:
URL: https://github.com/apache/nifi/pull/4780#discussion_r630464003
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
##########
@@ -917,10 +954,81 @@ public SwapSummary recoverSwappedFlowFiles() {
logger.info("Recovered {} swap files for {} in {} millis",
swapLocations.size() - failures, this, millis);
}
- return new StandardSwapSummary(new QueueSize(swapFlowFileCount,
swapByteCount), maxId, resourceClaims);
+ // minSwappedQueueDate and totalSwappedQueueDate within this
particular StandardSwapSummary are not ultimately used by the FlowController.
However,
+ // it can't hurt to set them here accurately in case they ever are.
+ return new StandardSwapSummary(new QueueSize(swapFlowFileCount,
swapByteCount), maxId, resourceClaims, minSwappedQueueDate,
totalSwappedQueueDate);
}
+ public long getMinLastQueueDate() {
+ readLock.lock();
+ try {
+ // We want the oldest timestamp, which will be the min
+ boolean seen = false;
+ Long min = null;
+ for (FlowFileRecord flowFileRecord : activeQueue) {
+ Long lastQueueDate = flowFileRecord.getLastQueueDate();
+ if (lastQueueDate != null) {
+ if (!seen || lastQueueDate.compareTo(min) < 0) {
+ seen = true;
+ min = lastQueueDate;
+ }
+ }
+ }
+
+ for (FlowFileRecord flowFileRecord : swapQueue) {
+ Long lastQueueDate = flowFileRecord.getLastQueueDate();
+ if (lastQueueDate != null) {
+ if (!seen || lastQueueDate.compareTo(min) < 0) {
+ seen = true;
+ min = lastQueueDate;
+ }
+ }
+ }
+
+ for(Long minSwapQueueDate: minQueueDateInSwapLocation.values()) {
+ if(!seen || minSwapQueueDate.compareTo(min) < 0) {
+ seen = true;
+ min = minSwapQueueDate;
+ }
+ }
+ return seen ? min : 0L;
+ } finally {
+ readLock.unlock("Get Min Last Queue Date");
+ }
+ }
+
+ public long getTotalQueuedDuration(long fromTimestamp) {
+ readLock.lock();
+ try {
+ long sum = 0L;
+ for (FlowFileRecord flowFileRecord : activeQueue) {
+ if ((flowFileRecord.getLastQueueDate() != null)) {
Review comment:
getLastQueueDate() is guaranteed non-null here.
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
##########
@@ -917,10 +954,81 @@ public SwapSummary recoverSwappedFlowFiles() {
logger.info("Recovered {} swap files for {} in {} millis",
swapLocations.size() - failures, this, millis);
}
- return new StandardSwapSummary(new QueueSize(swapFlowFileCount,
swapByteCount), maxId, resourceClaims);
+ // minSwappedQueueDate and totalSwappedQueueDate within this
particular StandardSwapSummary are not ultimately used by the FlowController.
However,
+ // it can't hurt to set them here accurately in case they ever are.
+ return new StandardSwapSummary(new QueueSize(swapFlowFileCount,
swapByteCount), maxId, resourceClaims, minSwappedQueueDate,
totalSwappedQueueDate);
}
+ public long getMinLastQueueDate() {
+ readLock.lock();
+ try {
+ // We want the oldest timestamp, which will be the min
+ boolean seen = false;
Review comment:
I find the logic here a bit confusing. I think it can be simplified
quite a bit by using:
```
long min = 0;
for (FlowFileRecord flowFileRecord : activeQueue) {
long lastQueueDate = flowFileRecord.getLastQueueDate();
if (min == 0 || lastQueueDate < min) {
min = lastQueueDate;
}
}
```
And given that the next part of the code repeats the same logic, probably
makes sense to extract out a method that takes an Iterable<FlowFileRecord> and
returns the minimum. Then it could be as simple as:
```
long min = getOldestQueueDate(activeQueue);
min = Math.min(min, getOldestQueueDate(swapQueue);
for (Long minSwapQueueDate : minQueueDateInSwapLocation.values()) {
min = Math.min(min, minSwapQueueDate);
}
return min;
```
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
##########
@@ -513,6 +525,30 @@ public QueueSize size() {
return totalSize.get();
}
+ @Override
+ public long getTotalQueuedDuration(long fromTimestamp) {
+ long sum = 0L;
+ for (QueuePartition queuePartition : queuePartitions) {
+ long totalActiveQueuedDuration =
queuePartition.getTotalActiveQueuedDuration(fromTimestamp);
+ sum += totalActiveQueuedDuration;
+ }
+ return sum;
+ }
+
+ @Override
+ public long getMinLastQueueDate() {
+ boolean seen = false;
+ long min = 0;
+ for (QueuePartition queuePartition : queuePartitions) {
+ long minLastQueueDate = queuePartition.getMinLastQueueDate();
+ if (!seen || minLastQueueDate < min) {
Review comment:
Again, I think we can get rid of the extraneous `seen` variable:
```
if (min == 0 || minLastQueueDate < min) {
min = minLastQueueDate;
}
...
return min;
```
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSummaryFieldMap.java
##########
@@ -86,6 +90,8 @@ public Object getFieldValue(final String fieldName) {
public static SwapSummary getSwapSummary(final Record record, final
ResourceClaimManager claimManager) {
final int flowFileCount = (Integer)
record.getFieldValue(SwapSchema.FLOWFILE_COUNT);
final long flowFileSize = (Long)
record.getFieldValue(SwapSchema.FLOWFILE_SIZE);
+ final long minLastQueueDate = (Long)
record.getFieldValue(SwapSchema.MIN_LAST_QUEUE_DATE);
Review comment:
Cannot case these from `Long` to `long` because they may be `null`.
Anything swap files that were written out in a previous version will not have
these fields. So these need to be checked for `null` and if it is `null` can
default to 0 perhaps?
I tried starting up with swap files generated from the `main` branch just to
verify, and it resulted in a `NullPointerException` on startup, which caused
NiFi to fail to start.
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/queue/StatelessFlowFileQueue.java
##########
@@ -98,6 +98,30 @@ public QueueSize size() {
return new QueueSize(flowFiles.size() + unacknowledgedCount.get(),
totalBytes.get());
}
+ @Override
+ public long getTotalQueuedDuration(long fromTimestamp) {
+ long sum = 0L;
+ for (FlowFileRecord flowFileRecord : flowFiles) {
+ long l = fromTimestamp - flowFileRecord.getLastQueueDate();
+ sum += l;
+ }
+ return sum;
+ }
+
+ @Override
+ public long getMinLastQueueDate() {
+ boolean seen = false;
Review comment:
Again, I think we can avoid this extraneous `seen` variable as indicated
above.
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
##########
@@ -917,10 +954,81 @@ public SwapSummary recoverSwappedFlowFiles() {
logger.info("Recovered {} swap files for {} in {} millis",
swapLocations.size() - failures, this, millis);
}
- return new StandardSwapSummary(new QueueSize(swapFlowFileCount,
swapByteCount), maxId, resourceClaims);
+ // minSwappedQueueDate and totalSwappedQueueDate within this
particular StandardSwapSummary are not ultimately used by the FlowController.
However,
+ // it can't hurt to set them here accurately in case they ever are.
+ return new StandardSwapSummary(new QueueSize(swapFlowFileCount,
swapByteCount), maxId, resourceClaims, minSwappedQueueDate,
totalSwappedQueueDate);
}
+ public long getMinLastQueueDate() {
+ readLock.lock();
+ try {
+ // We want the oldest timestamp, which will be the min
+ boolean seen = false;
+ Long min = null;
+ for (FlowFileRecord flowFileRecord : activeQueue) {
+ Long lastQueueDate = flowFileRecord.getLastQueueDate();
+ if (lastQueueDate != null) {
+ if (!seen || lastQueueDate.compareTo(min) < 0) {
+ seen = true;
+ min = lastQueueDate;
+ }
+ }
+ }
+
+ for (FlowFileRecord flowFileRecord : swapQueue) {
+ Long lastQueueDate = flowFileRecord.getLastQueueDate();
+ if (lastQueueDate != null) {
+ if (!seen || lastQueueDate.compareTo(min) < 0) {
+ seen = true;
+ min = lastQueueDate;
+ }
+ }
+ }
+
+ for(Long minSwapQueueDate: minQueueDateInSwapLocation.values()) {
+ if(!seen || minSwapQueueDate.compareTo(min) < 0) {
+ seen = true;
+ min = minSwapQueueDate;
+ }
+ }
+ return seen ? min : 0L;
+ } finally {
+ readLock.unlock("Get Min Last Queue Date");
+ }
+ }
+
+ public long getTotalQueuedDuration(long fromTimestamp) {
+ readLock.lock();
+ try {
+ long sum = 0L;
+ for (FlowFileRecord flowFileRecord : activeQueue) {
+ if ((flowFileRecord.getLastQueueDate() != null)) {
+ long l = fromTimestamp - flowFileRecord.getLastQueueDate();
+ sum += l;
+ }
+ }
+
+ for (FlowFileRecord flowFileRecord : swapQueue) {
+ if ((flowFileRecord.getLastQueueDate() != null)) {
Review comment:
getLastQueueDate() is guaranteed non-null here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]