exceptionfactory commented on code in PR #7027:
URL: https://github.com/apache/nifi/pull/7027#discussion_r1139517753


##########
nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java:
##########
@@ -761,6 +763,7 @@ public void listByTrackingTimestamps(final ProcessContext 
context, final Process
         }
 
         if (entityList == null || entityList.isEmpty()) {
+            getLogger().debug(String.format("There is no data to list with the 
criteria minTimestampToListMillis=%d. Yielding.", minTimestampToListMillis));

Review Comment:
   ```suggestion
               getLogger().debug("No data found matching minimum timestamp 
[{}]: yielding", minTimestampToListMillis));
   ```



##########
nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java:
##########
@@ -571,6 +571,7 @@ public void listByNoTracking(final ProcessContext context, 
final ProcessSession
         }
 
         if (entityList == null || entityList.isEmpty()) {
+            getLogger().debug("There is no data to list. Yielding.");

Review Comment:
   ```suggestion
               getLogger().debug("No data found: yielding");
   ```



##########
nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java:
##########
@@ -817,11 +820,20 @@ public void listByTrackingTimestamps(final ProcessContext 
context, final Process
                  *   - If we have not eclipsed the minimal listing lag needed 
due to being triggered too soon after the last run
                  *   - The latest listed entity timestamp is equal to the last 
processed time, meaning we handled those items originally passed over. No need 
to process it again.
                  */
-                final long  listingLagNanos = 
TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
-                if (currentRunTimeNanos - lastRunTimeNanos < listingLagNanos
-                        || 
(latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis)
-                        && 
orderedEntries.get(latestListedEntryTimestampThisCycleMillis).stream()
-                                .allMatch(entity -> 
latestIdentifiersProcessed.contains(entity.getIdentifier())))) {
+                final long listingLagNanos = 
TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
+                final boolean minimalListingLagNotPassed = currentRunTimeNanos 
- lastRunTimeNanos < listingLagNanos;
+
+                if (minimalListingLagNotPassed) {
+                    getLogger().debug("Minimal listing lag has not passed. 
Yielding.");

Review Comment:
   ```suggestion
                       getLogger().debug("Minimal listing lag not passed: 
yielding");
   ```



##########
nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java:
##########
@@ -738,6 +739,7 @@ public void listByTrackingTimestamps(final ProcessContext 
context, final Process
                 }
                 justElectedPrimaryNode = false;
                 if (noUpdateRequired) {
+                    getLogger().debug("Updating the timestamp of the last 
listed entry is not required. Yielding.");

Review Comment:
   ```suggestion
                       getLogger().debug("No update required for last listed 
entity: yielding");
   ```



##########
nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java:
##########
@@ -817,11 +820,20 @@ public void listByTrackingTimestamps(final ProcessContext 
context, final Process
                  *   - If we have not eclipsed the minimal listing lag needed 
due to being triggered too soon after the last run
                  *   - The latest listed entity timestamp is equal to the last 
processed time, meaning we handled those items originally passed over. No need 
to process it again.
                  */
-                final long  listingLagNanos = 
TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
-                if (currentRunTimeNanos - lastRunTimeNanos < listingLagNanos
-                        || 
(latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis)
-                        && 
orderedEntries.get(latestListedEntryTimestampThisCycleMillis).stream()
-                                .allMatch(entity -> 
latestIdentifiersProcessed.contains(entity.getIdentifier())))) {
+                final long listingLagNanos = 
TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
+                final boolean minimalListingLagNotPassed = currentRunTimeNanos 
- lastRunTimeNanos < listingLagNanos;
+
+                if (minimalListingLagNotPassed) {
+                    getLogger().debug("Minimal listing lag has not passed. 
Yielding.");
+                    context.yield();
+                    return;
+                }
+
+                final boolean latestListedEntryIsUpToDate = 
latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis)
+                        && 
orderedEntries.get(latestListedEntryTimestampThisCycleMillis).stream().allMatch(entity
 -> latestIdentifiersProcessed.contains(entity.getIdentifier()));
+
+                if (latestListedEntryIsUpToDate) {
+                    getLogger().debug("Already listed the latest entry. 
Yielding.");

Review Comment:
   ```suggestion
                       getLogger().debug("Latest entry already listed with 
timestamp [{}]: yielding", latestListedEntryTimestampThisCycleMillis);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to