exceptionfactory commented on code in PR #7027:
URL: https://github.com/apache/nifi/pull/7027#discussion_r1139517753
##########
nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java:
##########
@@ -761,6 +763,7 @@ public void listByTrackingTimestamps(final ProcessContext
context, final Process
}
if (entityList == null || entityList.isEmpty()) {
+ getLogger().debug(String.format("There is no data to list with the
criteria minTimestampToListMillis=%d. Yielding.", minTimestampToListMillis));
Review Comment:
```suggestion
getLogger().debug("No data found matching minimum timestamp
[{}]: yielding", minTimestampToListMillis));
```
##########
nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java:
##########
@@ -571,6 +571,7 @@ public void listByNoTracking(final ProcessContext context,
final ProcessSession
}
if (entityList == null || entityList.isEmpty()) {
+ getLogger().debug("There is no data to list. Yielding.");
Review Comment:
```suggestion
getLogger().debug("No data found: yielding");
```
##########
nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java:
##########
@@ -817,11 +820,20 @@ public void listByTrackingTimestamps(final ProcessContext
context, final Process
* - If we have not eclipsed the minimal listing lag needed
due to being triggered too soon after the last run
* - The latest listed entity timestamp is equal to the last
processed time, meaning we handled those items originally passed over. No need
to process it again.
*/
- final long listingLagNanos =
TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
- if (currentRunTimeNanos - lastRunTimeNanos < listingLagNanos
- ||
(latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis)
- &&
orderedEntries.get(latestListedEntryTimestampThisCycleMillis).stream()
- .allMatch(entity ->
latestIdentifiersProcessed.contains(entity.getIdentifier())))) {
+ final long listingLagNanos =
TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
+ final boolean minimalListingLagNotPassed = currentRunTimeNanos
- lastRunTimeNanos < listingLagNanos;
+
+ if (minimalListingLagNotPassed) {
+ getLogger().debug("Minimal listing lag has not passed.
Yielding.");
Review Comment:
```suggestion
getLogger().debug("Minimal listing lag not passed:
yielding");
```
##########
nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java:
##########
@@ -738,6 +739,7 @@ public void listByTrackingTimestamps(final ProcessContext
context, final Process
}
justElectedPrimaryNode = false;
if (noUpdateRequired) {
+ getLogger().debug("Updating the timestamp of the last
listed entry is not required. Yielding.");
Review Comment:
```suggestion
getLogger().debug("No update required for last listed
entity: yielding");
```
##########
nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java:
##########
@@ -817,11 +820,20 @@ public void listByTrackingTimestamps(final ProcessContext
context, final Process
* - If we have not eclipsed the minimal listing lag needed
due to being triggered too soon after the last run
* - The latest listed entity timestamp is equal to the last
processed time, meaning we handled those items originally passed over. No need
to process it again.
*/
- final long listingLagNanos =
TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
- if (currentRunTimeNanos - lastRunTimeNanos < listingLagNanos
- ||
(latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis)
- &&
orderedEntries.get(latestListedEntryTimestampThisCycleMillis).stream()
- .allMatch(entity ->
latestIdentifiersProcessed.contains(entity.getIdentifier())))) {
+ final long listingLagNanos =
TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
+ final boolean minimalListingLagNotPassed = currentRunTimeNanos
- lastRunTimeNanos < listingLagNanos;
+
+ if (minimalListingLagNotPassed) {
+ getLogger().debug("Minimal listing lag has not passed.
Yielding.");
+ context.yield();
+ return;
+ }
+
+ final boolean latestListedEntryIsUpToDate =
latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis)
+ &&
orderedEntries.get(latestListedEntryTimestampThisCycleMillis).stream().allMatch(entity
-> latestIdentifiersProcessed.contains(entity.getIdentifier()));
+
+ if (latestListedEntryIsUpToDate) {
+ getLogger().debug("Already listed the latest entry.
Yielding.");
Review Comment:
```suggestion
getLogger().debug("Latest entry already listed with
timestamp [{}]: yielding", latestListedEntryTimestampThisCycleMillis);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]