markap14 commented on a change in pull request #5593:
URL: https://github.com/apache/nifi/pull/5593#discussion_r777515747



##########
File path: nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
##########
@@ -163,4 +163,8 @@
      * @return the configured name of this processor
      */
     String getName();
+
+    boolean isRetriedRelationship(Relationship relationship);
+
+    int getRetryCounts();

Review comment:
       Again, should be `getRetryCount()` -- only a single count.
   Need to make sure we add JavaDocs.

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
##########
@@ -1863,6 +1873,57 @@ public ScheduledState getDesiredState() {
         return desiredState;
     }
 
+    @Override
+    public int getRetryCounts() {
+        return retryCounts.get();
+    }
+
+    @Override
+    public synchronized void setRetryCounts(int retryCounts) {
+        this.retryCounts.set(retryCounts);
+    }
+
+    @Override
+    public Set<String> getRetriedRelationships() {
+        if (retriedRelationships.get() == null) {
+            return new HashSet<>();
+        }
+        return retriedRelationships.get();
+    }
+
+    @Override
+    public synchronized void setRetriedRelationships(Set<String> 
retriedRelationships) {

Review comment:
       `synchronized` keyword is unnecessary here.

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -356,6 +408,95 @@ private void checkpoint(final boolean copyCollections) {
         checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
     }
 
+    private boolean isRetryNeeded(final ProcessorNode processorNode, final 
StandardRepositoryRecord record, final FlowFileRecord currentFlowFile,
+                                  final int retryCounts, final Map<String, 
StandardRepositoryRecord> uuidsToRecords) {
+        if (currentFlowFile == null || processorNode == null || 
processorNode.getRetriedRelationships().isEmpty()) {
+            return false;
+        }
+
+        if 
(processorNode.isRetriedRelationship(record.getTransferRelationship())) {
+            return retryCounts < processorNode.getRetryCounts();
+        }
+
+        if (forkEventBuilders.get(currentFlowFile) != null) {
+            for (String uuid : 
forkEventBuilders.get(currentFlowFile).getChildFlowFileIds()) {
+                if 
(processorNode.isRetriedRelationship(uuidsToRecords.get(uuid).getTransferRelationship()))
 {
+                    return retryCounts < processorNode.getRetryCounts();
+                }
+            }
+        }
+        return false;
+    }
+
+    private FlowFileRecord updateFlowFileRecord(final StandardRepositoryRecord 
record,
+                                                final Map<String, 
StandardRepositoryRecord> uuidsToRecords,
+                                                final int retryCounts, final 
FlowFileRecord flowFileRecord) {
+
+        removeTemporaryClaim(record);
+        if (forkEventBuilders.get(flowFileRecord) != null) {
+            for (String uuid : 
forkEventBuilders.get(flowFileRecord).getChildFlowFileIds()) {
+                final StandardRepositoryRecord childRecord = 
uuidsToRecords.get(uuid);
+                removeTemporaryClaim(childRecord);
+                createdFlowFiles.remove(uuid);
+                records.remove(childRecord.getCurrent().getId());
+            }
+        }
+
+        final StandardFlowFileRecord.Builder builder = new 
StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord);
+        record.setTransferRelationship(null);
+        record.setDestination(record.getOriginalQueue());
+
+        builder.addAttribute("retryCounts", String.valueOf(retryCounts));
+        record.getUpdatedAttributes().clear();
+        final FlowFileRecord newFile = builder.build();
+        record.setWorking(newFile, false);
+        return newFile;
+    }
+
+    private void adjustProcessorStatistics(final StandardRepositoryRecord 
record, final Relationship relationship,
+                                           final Map<String, 
StandardRepositoryRecord> uuidsToRecords) {
+        final int numDestinations = 
context.getConnections(relationship).size();
+        final int multiplier = Math.max(1, numDestinations);
+        final ProvenanceEventBuilder eventBuilder = 
forkEventBuilders.get(record.getOriginal());
+        final List<String> childFlowFileIds = new ArrayList<>();
+        int contentSize = 0;
+
+        if (eventBuilder != null) {
+            childFlowFileIds.addAll(eventBuilder.getChildFlowFileIds());
+            for (String uuid : childFlowFileIds) {
+                contentSize += uuidsToRecords.get(uuid).getCurrent().getSize();
+            }
+        }
+
+        flowFilesIn--;
+        contentSizeIn -= record.getOriginal().getSize();
+        flowFilesOut -= multiplier * (childFlowFileIds.size() + 1);
+        contentSizeOut -= (record.getCurrent().getSize() + contentSize) * 
multiplier;
+    }
+
+    private void clearEvents(final FlowFileRecord flowFileRecord) {
+        Set<ProvenanceEventRecord> events = 
provenanceReporter.getEvents().stream()
+                .filter(event -> 
event.getFlowFileUuid().equals(flowFileRecord.getAttribute(CoreAttributes.UUID.key())))
+                .collect(Collectors.toSet());
+
+        for (ProvenanceEventRecord event : events) {
+            provenanceReporter.remove(event);
+        }
+
+        forkEventBuilders.remove(flowFileRecord);
+    }
+
+    private long calculateBackoffTime(int retryCounts, final long 
maxBackoffPeriod, final long penalizationTime) {
+        if (retryCounts == 0) {
+            return penalizationTime;
+        }
+        long current = calculateBackoffTime(--retryCounts, maxBackoffPeriod, 
penalizationTime) * 2;

Review comment:
       Not sure that it makes sense to use a recursive call here. And I would 
avoid decrementing the incoming argument during a call to recursion. I *think* 
what this is intended to do is to just return twice the previous penalization 
period, right? Perhaps using a loop to iterate over `for (int i=0; i < 
retryCount; i++) {`... would make things clearer?

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -356,6 +408,95 @@ private void checkpoint(final boolean copyCollections) {
         checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
     }
 
+    private boolean isRetryNeeded(final ProcessorNode processorNode, final 
StandardRepositoryRecord record, final FlowFileRecord currentFlowFile,
+                                  final int retryCounts, final Map<String, 
StandardRepositoryRecord> uuidsToRecords) {
+        if (currentFlowFile == null || processorNode == null || 
processorNode.getRetriedRelationships().isEmpty()) {
+            return false;
+        }
+
+        if 
(processorNode.isRetriedRelationship(record.getTransferRelationship())) {
+            return retryCounts < processorNode.getRetryCounts();
+        }
+
+        if (forkEventBuilders.get(currentFlowFile) != null) {
+            for (String uuid : 
forkEventBuilders.get(currentFlowFile).getChildFlowFileIds()) {
+                if 
(processorNode.isRetriedRelationship(uuidsToRecords.get(uuid).getTransferRelationship()))
 {
+                    return retryCounts < processorNode.getRetryCounts();
+                }
+            }
+        }
+        return false;
+    }
+
+    private FlowFileRecord updateFlowFileRecord(final StandardRepositoryRecord 
record,
+                                                final Map<String, 
StandardRepositoryRecord> uuidsToRecords,
+                                                final int retryCounts, final 
FlowFileRecord flowFileRecord) {
+
+        removeTemporaryClaim(record);
+        if (forkEventBuilders.get(flowFileRecord) != null) {
+            for (String uuid : 
forkEventBuilders.get(flowFileRecord).getChildFlowFileIds()) {
+                final StandardRepositoryRecord childRecord = 
uuidsToRecords.get(uuid);
+                removeTemporaryClaim(childRecord);
+                createdFlowFiles.remove(uuid);
+                records.remove(childRecord.getCurrent().getId());
+            }
+        }
+
+        final StandardFlowFileRecord.Builder builder = new 
StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord);
+        record.setTransferRelationship(null);
+        record.setDestination(record.getOriginalQueue());
+
+        builder.addAttribute("retryCounts", String.valueOf(retryCounts));
+        record.getUpdatedAttributes().clear();
+        final FlowFileRecord newFile = builder.build();
+        record.setWorking(newFile, false);
+        return newFile;
+    }
+
+    private void adjustProcessorStatistics(final StandardRepositoryRecord 
record, final Relationship relationship,
+                                           final Map<String, 
StandardRepositoryRecord> uuidsToRecords) {
+        final int numDestinations = 
context.getConnections(relationship).size();
+        final int multiplier = Math.max(1, numDestinations);
+        final ProvenanceEventBuilder eventBuilder = 
forkEventBuilders.get(record.getOriginal());
+        final List<String> childFlowFileIds = new ArrayList<>();
+        int contentSize = 0;
+
+        if (eventBuilder != null) {
+            childFlowFileIds.addAll(eventBuilder.getChildFlowFileIds());
+            for (String uuid : childFlowFileIds) {
+                contentSize += uuidsToRecords.get(uuid).getCurrent().getSize();
+            }
+        }

Review comment:
       Not sure that this logic is correct for outbound content size if some of 
the FlowFiles are auto-terminated, because we would not have incremented the 
outbound content size for them.

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java
##########
@@ -308,4 +314,48 @@ public void setDefaultSchedulingPeriod(Map<String, String> 
defaultSchedulingPeri
         this.defaultSchedulingPeriod = defaultSchedulingPeriod;
     }
 
+    @ApiModelProperty(
+            value = "Overall number of retries."
+    )
+    public Integer getRetryCounts() {

Review comment:
       retryCounts again could be retryCount

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
##########
@@ -157,6 +157,11 @@
     private final int hashCode;
     private volatile boolean hasActiveThreads = false;
 
+    private final AtomicInteger retryCounts;

Review comment:
       And again :)

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
##########
@@ -1863,6 +1873,57 @@ public ScheduledState getDesiredState() {
         return desiredState;
     }
 
+    @Override
+    public int getRetryCounts() {
+        return retryCounts.get();
+    }
+
+    @Override
+    public synchronized void setRetryCounts(int retryCounts) {

Review comment:
       No need for this to be synchronized - you've already got the appropriate 
synchronization mechanisms in place via the AtomicReference (or volatile 
keyword)

##########
File path: nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
##########
@@ -163,4 +163,8 @@
      * @return the configured name of this processor
      */
     String getName();
+
+    boolean isRetriedRelationship(Relationship relationship);

Review comment:
       Not a huge deal here but the name here reads a bit awkwardly for me. 
`isRetriedRelationship` makes "retried" an adjective. Might make more sense to 
call it `isRelationshipRetried` - so that you're essentially asking whether or 
not this relationship should be retried.
   In either case, though, we need to ensure that we add JavaDocs for any 
method added to `ProcessContext`.

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
##########
@@ -157,6 +157,11 @@
     private final int hashCode;
     private volatile boolean hasActiveThreads = false;
 
+    private final AtomicInteger retryCounts;
+    private final AtomicReference<Set<String>> retriedRelationships;
+    private final AtomicReference<BackoffMechanism> backoffMechanism;
+    private final AtomicReference<String> maxBackoffPeriod;

Review comment:
       All of these atomic variables would perhaps serve best as `volatile` 
values. I don't see anywhere that we need to perform an atomic check-and-swap 
(CAS) operations.

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
##########
@@ -1863,6 +1873,57 @@ public ScheduledState getDesiredState() {
         return desiredState;
     }
 
+    @Override
+    public int getRetryCounts() {
+        return retryCounts.get();
+    }
+
+    @Override
+    public synchronized void setRetryCounts(int retryCounts) {
+        this.retryCounts.set(retryCounts);
+    }
+
+    @Override
+    public Set<String> getRetriedRelationships() {
+        if (retriedRelationships.get() == null) {
+            return new HashSet<>();
+        }
+        return retriedRelationships.get();
+    }
+
+    @Override
+    public synchronized void setRetriedRelationships(Set<String> 
retriedRelationships) {
+        this.retriedRelationships.set(retriedRelationships);

Review comment:
       When we set a member variable to some mutable object like this Set, it's 
important to consider what would happen if the caller (or some other block of 
code) were to change that variable. That would inadvertently modify the 
internal state of this Processor, which should not be allowed. So we might want 
to create a defensive copy of the Set. So we could use something like:
   ```
   this.retriedRelationships = (retriedRelationships == null) ? 
Collections.emptySet() : new HashSet<>(retriedRelationships);
   ```
   This also ensures that we never have a `null` collection. So there's no need 
to check that in the `getRetriedRelationships()` and `isRetriedRelationship()` 
method

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
##########
@@ -1863,6 +1873,57 @@ public ScheduledState getDesiredState() {
         return desiredState;
     }
 
+    @Override
+    public int getRetryCounts() {
+        return retryCounts.get();
+    }
+
+    @Override
+    public synchronized void setRetryCounts(int retryCounts) {
+        this.retryCounts.set(retryCounts);
+    }
+
+    @Override
+    public Set<String> getRetriedRelationships() {
+        if (retriedRelationships.get() == null) {
+            return new HashSet<>();
+        }
+        return retriedRelationships.get();
+    }
+
+    @Override
+    public synchronized void setRetriedRelationships(Set<String> 
retriedRelationships) {
+        this.retriedRelationships.set(retriedRelationships);
+    }
+
+    @Override
+    public boolean isRetriedRelationship(final Relationship relationship) {
+        if (this.retriedRelationships.get() == null || relationship == null) {
+            return false;
+        } else {
+            return 
this.retriedRelationships.get().contains(relationship.getName());
+        }
+    }
+
+    @Override
+    public BackoffMechanism getBackoffMechanism() {
+        return backoffMechanism.get();
+    }
+
+    @Override
+    public synchronized void setBackoffMechanism(BackoffMechanism 
backoffMechanism) {
+        this.backoffMechanism.set(backoffMechanism);
+    }
+
+    @Override
+    public String getMaxBackoffPeriod() {
+        return maxBackoffPeriod.get();
+    }
+
+    @Override
+    public synchronized void setMaxBackoffPeriod(String maxBackoffPeriod) {

Review comment:
       `synchronized` keyword is unnecessary here.

##########
File path: nifi-api/src/main/java/org/apache/nifi/flow/VersionedProcessor.java
##########
@@ -44,6 +44,11 @@
     private Set<String> autoTerminatedRelationships;
     private ScheduledState scheduledState;
 
+    private Integer retryCounts;

Review comment:
       Should call this `retryCount` -- there is only one count :)

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
##########
@@ -1863,6 +1873,57 @@ public ScheduledState getDesiredState() {
         return desiredState;
     }
 
+    @Override
+    public int getRetryCounts() {
+        return retryCounts.get();
+    }
+
+    @Override
+    public synchronized void setRetryCounts(int retryCounts) {
+        this.retryCounts.set(retryCounts);
+    }
+
+    @Override
+    public Set<String> getRetriedRelationships() {
+        if (retriedRelationships.get() == null) {
+            return new HashSet<>();
+        }
+        return retriedRelationships.get();
+    }
+
+    @Override
+    public synchronized void setRetriedRelationships(Set<String> 
retriedRelationships) {
+        this.retriedRelationships.set(retriedRelationships);
+    }
+
+    @Override
+    public boolean isRetriedRelationship(final Relationship relationship) {
+        if (this.retriedRelationships.get() == null || relationship == null) {
+            return false;
+        } else {
+            return 
this.retriedRelationships.get().contains(relationship.getName());
+        }
+    }
+
+    @Override
+    public BackoffMechanism getBackoffMechanism() {
+        return backoffMechanism.get();
+    }
+
+    @Override
+    public synchronized void setBackoffMechanism(BackoffMechanism 
backoffMechanism) {
+        this.backoffMechanism.set(backoffMechanism);
+    }
+
+    @Override
+    public String getMaxBackoffPeriod() {
+        return maxBackoffPeriod.get();
+    }
+
+    @Override
+    public synchronized void setMaxBackoffPeriod(String maxBackoffPeriod) {
+        this.maxBackoffPeriod.set(maxBackoffPeriod);

Review comment:
       This is expected to be parsed later into a time period. Best to validate 
the input before simply storing it. See `setYieldPeriod()` for an example. We 
should ensure not only that it's a valid time period syntactically but also 
that it's a possible value

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
##########
@@ -1863,6 +1873,57 @@ public ScheduledState getDesiredState() {
         return desiredState;
     }
 
+    @Override
+    public int getRetryCounts() {
+        return retryCounts.get();
+    }
+
+    @Override
+    public synchronized void setRetryCounts(int retryCounts) {
+        this.retryCounts.set(retryCounts);
+    }
+
+    @Override
+    public Set<String> getRetriedRelationships() {
+        if (retriedRelationships.get() == null) {
+            return new HashSet<>();
+        }
+        return retriedRelationships.get();
+    }
+
+    @Override
+    public synchronized void setRetriedRelationships(Set<String> 
retriedRelationships) {
+        this.retriedRelationships.set(retriedRelationships);
+    }
+
+    @Override
+    public boolean isRetriedRelationship(final Relationship relationship) {
+        if (this.retriedRelationships.get() == null || relationship == null) {
+            return false;
+        } else {
+            return 
this.retriedRelationships.get().contains(relationship.getName());
+        }
+    }
+
+    @Override
+    public BackoffMechanism getBackoffMechanism() {
+        return backoffMechanism.get();
+    }
+
+    @Override
+    public synchronized void setBackoffMechanism(BackoffMechanism 
backoffMechanism) {

Review comment:
       `synchronized` keyword is unnecessary here.

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
##########
@@ -835,6 +837,33 @@ public Response updateProcessor(
             }
         }
 
+        final ProcessorConfigDTO processorConfig = 
requestProcessorDTO.getConfig();
+        if (processorConfig != null) {
+            if (processorConfig.getRetryCounts() != null && 
processorConfig.getRetryCounts() < 0) {
+                throw new IllegalArgumentException("Retry Counts should not be 
less than zero.");
+            }
+
+            if (processorConfig.getBackoffMechanism() != null) {
+                try {
+                    
BackoffMechanism.valueOf(processorConfig.getBackoffMechanism());
+                } catch (Exception e) {
+                    throw new IllegalArgumentException("Backoff Mechanism " + 
processorConfig.getBackoffMechanism() + " is invalid.");
+                }
+            }
+
+            if (processorConfig.getMaxBackoffPeriod() != null && 
!FormatUtils.TIME_DURATION_PATTERN.matcher(processorConfig.getMaxBackoffPeriod()).matches())
 {
+                throw new IllegalArgumentException("Max Backoff Period should 
be specified as time, for example 5 mins");
+            }
+
+            if (processorConfig.getRetriedRelationships() != null && 
processorConfig.getAutoTerminatedRelationships() != null) {
+                for (String relationship : 
processorConfig.getAutoTerminatedRelationships()) {
+                    if 
(processorConfig.getRetriedRelationships().stream().anyMatch(rel -> 
rel.equals(relationship))) {
+                        throw new IllegalArgumentException("Auto terminated 
relationship " + relationship + " can not be retried");

Review comment:
       Why do we have this restriction? It definitely makes sense to allow 
auto-terminated relationships to be retried. You could, for instance, configure 
your flow so  that it retries a 'failure' relationship 10 times. After 10 
times, if still unable to deliver to the destination then auto-terminate it. I 
think this is likely to be a common use case.

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
##########
@@ -2800,7 +2809,299 @@ public void 
testCreateNewFlowFileWithoutParentThenMultipleWritesCountsClaimRefer
         assertEquals(0, 
contentRepo.getClaimantCount(getContentClaim(flowFile)));
     }
 
+    @Test
+    public void testWhenInRetryAttributeIsAdded() {
+        final Connectable processor = createProcessorConnectable();
+        configureRetry(processor, 1, BackoffMechanism.PENALIZE_FLOWFILE, "1 
ms", 1L);
+
+        StandardProcessSession session = createSessionForRetry(processor);
+
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+                .build();
+
+        flowFileQueue.put(flowFileRecord);
+
+        final Relationship relationship = new 
Relationship.Builder().name("A").build();
+
+        FlowFile ff1 = session.get();
+        assertNotNull(ff1);
+        session.transfer(flowFileRecord, relationship);
+        session.checkpoint();

Review comment:
       Should not call `checkpoint()` here - this is something that is handled 
internally by the ProcessSession when `commit()` gets called. 

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -356,6 +408,95 @@ private void checkpoint(final boolean copyCollections) {
         checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
     }
 
+    private boolean isRetryNeeded(final ProcessorNode processorNode, final 
StandardRepositoryRecord record, final FlowFileRecord currentFlowFile,
+                                  final int retryCounts, final Map<String, 
StandardRepositoryRecord> uuidsToRecords) {
+        if (currentFlowFile == null || processorNode == null || 
processorNode.getRetriedRelationships().isEmpty()) {
+            return false;
+        }
+
+        if 
(processorNode.isRetriedRelationship(record.getTransferRelationship())) {
+            return retryCounts < processorNode.getRetryCounts();
+        }
+
+        if (forkEventBuilders.get(currentFlowFile) != null) {
+            for (String uuid : 
forkEventBuilders.get(currentFlowFile).getChildFlowFileIds()) {
+                if 
(processorNode.isRetriedRelationship(uuidsToRecords.get(uuid).getTransferRelationship()))
 {
+                    return retryCounts < processorNode.getRetryCounts();
+                }
+            }
+        }
+        return false;
+    }
+
+    private FlowFileRecord updateFlowFileRecord(final StandardRepositoryRecord 
record,

Review comment:
       `updateFlowFileRecord` is a very vague name and doesn't tell me much of 
anything about what this method is doing.

##########
File path: 
nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RetryIT.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.tests.system.processor;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class RetryIT extends NiFiSystemIT {
+    private static final int RETRY_COUNTS = 2;
+    private static final String EXPECTED_NO_RETRY_COUNTS = null;
+    private static final String EXPECTED_FIRST_RETRY = 
String.valueOf(RETRY_COUNTS - 1);
+    private static final String EXPECTED_SECOND_RETRY = 
String.valueOf(RETRY_COUNTS);
+
+    @Test
+    public void testRetryHappensTwiceThenFinishes() throws 
NiFiClientException, IOException, InterruptedException {
+        //Create a GenerateFlowFile processor
+        final ProcessorEntity generateFlowFile = 
getClientUtil().createProcessor("GenerateFlowFile");
+
+        //Create a PassThrough processor
+        final ProcessorEntity passThrough = 
getClientUtil().createProcessor("PassThrough");
+
+        //Configure the processor's success relationship to be retried twice 
with flow file penalization for a maximum of 1 ms
+        final ProcessorConfigDTO config = new ProcessorConfigDTO();
+        config.setRetryCounts(RETRY_COUNTS);
+        config.setMaxBackoffPeriod("1 ms");
+        config.setBackoffMechanism("PENALIZE_FLOWFILE");
+        config.setRetriedRelationships(Collections.singleton("success"));
+        config.setPenaltyDuration("1 ms");
+        getClientUtil().updateProcessorConfig(passThrough, config);

Review comment:
       This block appears to be super repetitive. Might consider creating a 
simple method:
   ```
   void enableRetries(ProcessorEntity processor, String relationship);
   ```
   that performs all of this logic.

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -356,6 +408,95 @@ private void checkpoint(final boolean copyCollections) {
         checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
     }
 
+    private boolean isRetryNeeded(final ProcessorNode processorNode, final 
StandardRepositoryRecord record, final FlowFileRecord currentFlowFile,
+                                  final int retryCounts, final Map<String, 
StandardRepositoryRecord> uuidsToRecords) {
+        if (currentFlowFile == null || processorNode == null || 
processorNode.getRetriedRelationships().isEmpty()) {
+            return false;
+        }
+
+        if 
(processorNode.isRetriedRelationship(record.getTransferRelationship())) {
+            return retryCounts < processorNode.getRetryCounts();
+        }
+
+        if (forkEventBuilders.get(currentFlowFile) != null) {
+            for (String uuid : 
forkEventBuilders.get(currentFlowFile).getChildFlowFileIds()) {
+                if 
(processorNode.isRetriedRelationship(uuidsToRecords.get(uuid).getTransferRelationship()))
 {
+                    return retryCounts < processorNode.getRetryCounts();
+                }
+            }
+        }
+        return false;
+    }
+
+    private FlowFileRecord updateFlowFileRecord(final StandardRepositoryRecord 
record,
+                                                final Map<String, 
StandardRepositoryRecord> uuidsToRecords,
+                                                final int retryCounts, final 
FlowFileRecord flowFileRecord) {
+
+        removeTemporaryClaim(record);
+        if (forkEventBuilders.get(flowFileRecord) != null) {
+            for (String uuid : 
forkEventBuilders.get(flowFileRecord).getChildFlowFileIds()) {
+                final StandardRepositoryRecord childRecord = 
uuidsToRecords.get(uuid);
+                removeTemporaryClaim(childRecord);
+                createdFlowFiles.remove(uuid);
+                records.remove(childRecord.getCurrent().getId());
+            }
+        }
+
+        final StandardFlowFileRecord.Builder builder = new 
StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord);
+        record.setTransferRelationship(null);
+        record.setDestination(record.getOriginalQueue());
+
+        builder.addAttribute("retryCounts", String.valueOf(retryCounts));
+        record.getUpdatedAttributes().clear();
+        final FlowFileRecord newFile = builder.build();
+        record.setWorking(newFile, false);
+        return newFile;
+    }
+
+    private void adjustProcessorStatistics(final StandardRepositoryRecord 
record, final Relationship relationship,

Review comment:
       `adjustProcessorStatistics` is also quite vague - how is it adjusting 
them? Why is it adjusting them this way? Perhaps it should be called something 
like `adjustConnectableStatsForRetry`?

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -356,6 +408,95 @@ private void checkpoint(final boolean copyCollections) {
         checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
     }
 
+    private boolean isRetryNeeded(final ProcessorNode processorNode, final 
StandardRepositoryRecord record, final FlowFileRecord currentFlowFile,
+                                  final int retryCounts, final Map<String, 
StandardRepositoryRecord> uuidsToRecords) {
+        if (currentFlowFile == null || processorNode == null || 
processorNode.getRetriedRelationships().isEmpty()) {
+            return false;
+        }
+
+        if 
(processorNode.isRetriedRelationship(record.getTransferRelationship())) {

Review comment:
       This is not a very clean separation of concerns here, to add the 
retry-related methods only to ProcessorNode, and then have instances where we 
have to check if a `Connectable` is a `ProcessorNode` and only then treat it 
differently. Conceptually, it makes sense that any `Connectable` object could 
retry, even though they may not - especially initially. But I can envision this 
eventually being used to improve Site-to-Site connections, etc.
   
   It would make more sense to add the retry-related methods to `Connectable` 
and then implement `AbstractPort` and `StandardFunnel` to simply retry 0 for 
retry count, an empty collection for relationships that get retried, etc. 
There's nothing special about ProcessorNode as an entity that makes it uniquely 
suited for this. Should be on all Connectables.

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -356,6 +408,95 @@ private void checkpoint(final boolean copyCollections) {
         checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
     }
 
+    private boolean isRetryNeeded(final ProcessorNode processorNode, final 
StandardRepositoryRecord record, final FlowFileRecord currentFlowFile,
+                                  final int retryCounts, final Map<String, 
StandardRepositoryRecord> uuidsToRecords) {
+        if (currentFlowFile == null || processorNode == null || 
processorNode.getRetriedRelationships().isEmpty()) {
+            return false;
+        }
+
+        if 
(processorNode.isRetriedRelationship(record.getTransferRelationship())) {
+            return retryCounts < processorNode.getRetryCounts();
+        }
+
+        if (forkEventBuilders.get(currentFlowFile) != null) {
+            for (String uuid : 
forkEventBuilders.get(currentFlowFile).getChildFlowFileIds()) {
+                if 
(processorNode.isRetriedRelationship(uuidsToRecords.get(uuid).getTransferRelationship()))
 {
+                    return retryCounts < processorNode.getRetryCounts();
+                }
+            }
+        }
+        return false;
+    }
+
+    private FlowFileRecord updateFlowFileRecord(final StandardRepositoryRecord 
record,
+                                                final Map<String, 
StandardRepositoryRecord> uuidsToRecords,
+                                                final int retryCounts, final 
FlowFileRecord flowFileRecord) {
+
+        removeTemporaryClaim(record);
+        if (forkEventBuilders.get(flowFileRecord) != null) {
+            for (String uuid : 
forkEventBuilders.get(flowFileRecord).getChildFlowFileIds()) {
+                final StandardRepositoryRecord childRecord = 
uuidsToRecords.get(uuid);
+                removeTemporaryClaim(childRecord);
+                createdFlowFiles.remove(uuid);
+                records.remove(childRecord.getCurrent().getId());
+            }
+        }
+
+        final StandardFlowFileRecord.Builder builder = new 
StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord);
+        record.setTransferRelationship(null);
+        record.setDestination(record.getOriginalQueue());
+
+        builder.addAttribute("retryCounts", String.valueOf(retryCounts));
+        record.getUpdatedAttributes().clear();

Review comment:
       We should not be calling `clear()` here. This is attempting to modify 
internal state of another component, which could have unexpected consequences. 
And it will definitely fail in some cases.

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -356,6 +408,95 @@ private void checkpoint(final boolean copyCollections) {
         checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
     }
 
+    private boolean isRetryNeeded(final ProcessorNode processorNode, final 
StandardRepositoryRecord record, final FlowFileRecord currentFlowFile,
+                                  final int retryCounts, final Map<String, 
StandardRepositoryRecord> uuidsToRecords) {
+        if (currentFlowFile == null || processorNode == null || 
processorNode.getRetriedRelationships().isEmpty()) {
+            return false;
+        }
+
+        if 
(processorNode.isRetriedRelationship(record.getTransferRelationship())) {
+            return retryCounts < processorNode.getRetryCounts();
+        }
+
+        if (forkEventBuilders.get(currentFlowFile) != null) {
+            for (String uuid : 
forkEventBuilders.get(currentFlowFile).getChildFlowFileIds()) {
+                if 
(processorNode.isRetriedRelationship(uuidsToRecords.get(uuid).getTransferRelationship()))
 {
+                    return retryCounts < processorNode.getRetryCounts();
+                }
+            }
+        }
+        return false;
+    }
+
+    private FlowFileRecord updateFlowFileRecord(final StandardRepositoryRecord 
record,
+                                                final Map<String, 
StandardRepositoryRecord> uuidsToRecords,
+                                                final int retryCounts, final 
FlowFileRecord flowFileRecord) {
+
+        removeTemporaryClaim(record);
+        if (forkEventBuilders.get(flowFileRecord) != null) {
+            for (String uuid : 
forkEventBuilders.get(flowFileRecord).getChildFlowFileIds()) {
+                final StandardRepositoryRecord childRecord = 
uuidsToRecords.get(uuid);
+                removeTemporaryClaim(childRecord);
+                createdFlowFiles.remove(uuid);
+                records.remove(childRecord.getCurrent().getId());
+            }
+        }
+
+        final StandardFlowFileRecord.Builder builder = new 
StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord);
+        record.setTransferRelationship(null);
+        record.setDestination(record.getOriginalQueue());
+
+        builder.addAttribute("retryCounts", String.valueOf(retryCounts));
+        record.getUpdatedAttributes().clear();
+        final FlowFileRecord newFile = builder.build();
+        record.setWorking(newFile, false);
+        return newFile;
+    }
+
+    private void adjustProcessorStatistics(final StandardRepositoryRecord 
record, final Relationship relationship,
+                                           final Map<String, 
StandardRepositoryRecord> uuidsToRecords) {
+        final int numDestinations = 
context.getConnections(relationship).size();
+        final int multiplier = Math.max(1, numDestinations);
+        final ProvenanceEventBuilder eventBuilder = 
forkEventBuilders.get(record.getOriginal());
+        final List<String> childFlowFileIds = new ArrayList<>();
+        int contentSize = 0;
+
+        if (eventBuilder != null) {
+            childFlowFileIds.addAll(eventBuilder.getChildFlowFileIds());
+            for (String uuid : childFlowFileIds) {
+                contentSize += uuidsToRecords.get(uuid).getCurrent().getSize();
+            }
+        }
+
+        flowFilesIn--;
+        contentSizeIn -= record.getOriginal().getSize();
+        flowFilesOut -= multiplier * (childFlowFileIds.size() + 1);
+        contentSizeOut -= (record.getCurrent().getSize() + contentSize) * 
multiplier;
+    }
+
+    private void clearEvents(final FlowFileRecord flowFileRecord) {

Review comment:
       What events is this clearing? Perhaps `clearProvenanceEvents` makes more 
sense?

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -2015,6 +2156,18 @@ public FlowFile penalize(FlowFile flowFile) {
         return newFile;
     }
 
+    public FlowFile penalize(FlowFile flowFile, long period, TimeUnit 
timeUnit) {

Review comment:
       Should probably update the previous penalize() call to just call 
`penalize(flowFile, 
context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS);` no? 
I.e., make use of this method, rather than repeating the logic

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -356,6 +408,95 @@ private void checkpoint(final boolean copyCollections) {
         checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
     }
 
+    private boolean isRetryNeeded(final ProcessorNode processorNode, final 
StandardRepositoryRecord record, final FlowFileRecord currentFlowFile,
+                                  final int retryCounts, final Map<String, 
StandardRepositoryRecord> uuidsToRecords) {
+        if (currentFlowFile == null || processorNode == null || 
processorNode.getRetriedRelationships().isEmpty()) {
+            return false;
+        }
+
+        if 
(processorNode.isRetriedRelationship(record.getTransferRelationship())) {
+            return retryCounts < processorNode.getRetryCounts();
+        }
+
+        if (forkEventBuilders.get(currentFlowFile) != null) {
+            for (String uuid : 
forkEventBuilders.get(currentFlowFile).getChildFlowFileIds()) {
+                if 
(processorNode.isRetriedRelationship(uuidsToRecords.get(uuid).getTransferRelationship()))
 {
+                    return retryCounts < processorNode.getRetryCounts();
+                }
+            }
+        }
+        return false;
+    }
+
+    private FlowFileRecord updateFlowFileRecord(final StandardRepositoryRecord 
record,
+                                                final Map<String, 
StandardRepositoryRecord> uuidsToRecords,
+                                                final int retryCounts, final 
FlowFileRecord flowFileRecord) {
+
+        removeTemporaryClaim(record);
+        if (forkEventBuilders.get(flowFileRecord) != null) {
+            for (String uuid : 
forkEventBuilders.get(flowFileRecord).getChildFlowFileIds()) {
+                final StandardRepositoryRecord childRecord = 
uuidsToRecords.get(uuid);
+                removeTemporaryClaim(childRecord);
+                createdFlowFiles.remove(uuid);
+                records.remove(childRecord.getCurrent().getId());
+            }
+        }
+
+        final StandardFlowFileRecord.Builder builder = new 
StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord);
+        record.setTransferRelationship(null);
+        record.setDestination(record.getOriginalQueue());
+
+        builder.addAttribute("retryCounts", String.valueOf(retryCounts));
+        record.getUpdatedAttributes().clear();
+        final FlowFileRecord newFile = builder.build();
+        record.setWorking(newFile, false);
+        return newFile;
+    }
+
+    private void adjustProcessorStatistics(final StandardRepositoryRecord 
record, final Relationship relationship,
+                                           final Map<String, 
StandardRepositoryRecord> uuidsToRecords) {
+        final int numDestinations = 
context.getConnections(relationship).size();
+        final int multiplier = Math.max(1, numDestinations);
+        final ProvenanceEventBuilder eventBuilder = 
forkEventBuilders.get(record.getOriginal());
+        final List<String> childFlowFileIds = new ArrayList<>();
+        int contentSize = 0;
+
+        if (eventBuilder != null) {
+            childFlowFileIds.addAll(eventBuilder.getChildFlowFileIds());
+            for (String uuid : childFlowFileIds) {
+                contentSize += uuidsToRecords.get(uuid).getCurrent().getSize();
+            }
+        }
+
+        flowFilesIn--;
+        contentSizeIn -= record.getOriginal().getSize();
+        flowFilesOut -= multiplier * (childFlowFileIds.size() + 1);
+        contentSizeOut -= (record.getCurrent().getSize() + contentSize) * 
multiplier;
+    }
+
+    private void clearEvents(final FlowFileRecord flowFileRecord) {
+        Set<ProvenanceEventRecord> events = 
provenanceReporter.getEvents().stream()
+                .filter(event -> 
event.getFlowFileUuid().equals(flowFileRecord.getAttribute(CoreAttributes.UUID.key())))
+                .collect(Collectors.toSet());
+
+        for (ProvenanceEventRecord event : events) {
+            provenanceReporter.remove(event);
+        }
+
+        forkEventBuilders.remove(flowFileRecord);
+    }
+
+    private long calculateBackoffTime(int retryCounts, final long 
maxBackoffPeriod, final long penalizationTime) {
+        if (retryCounts == 0) {
+            return penalizationTime;
+        }
+        long current = calculateBackoffTime(--retryCounts, maxBackoffPeriod, 
penalizationTime) * 2;
+        if (current >= maxBackoffPeriod) {
+            return maxBackoffPeriod;
+        }
+        return current;

Review comment:
       Simpler (and clearer) to just `return Math.max(current, 
maxBackoffPeriod);` ?

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -356,6 +408,95 @@ private void checkpoint(final boolean copyCollections) {
         checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
     }
 
+    private boolean isRetryNeeded(final ProcessorNode processorNode, final 
StandardRepositoryRecord record, final FlowFileRecord currentFlowFile,
+                                  final int retryCounts, final Map<String, 
StandardRepositoryRecord> uuidsToRecords) {
+        if (currentFlowFile == null || processorNode == null || 
processorNode.getRetriedRelationships().isEmpty()) {
+            return false;
+        }
+
+        if 
(processorNode.isRetriedRelationship(record.getTransferRelationship())) {
+            return retryCounts < processorNode.getRetryCounts();
+        }
+
+        if (forkEventBuilders.get(currentFlowFile) != null) {
+            for (String uuid : 
forkEventBuilders.get(currentFlowFile).getChildFlowFileIds()) {

Review comment:
       Rather than call `Map.get()` twice, the first time to compare to null, 
the second time to operate on the return value, should call `Map.get()` once 
and store value in a local variable. The call to `Map.get()` can be pretty 
expensive if repeated over & over, due to the hashing that happens there.

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -356,6 +408,95 @@ private void checkpoint(final boolean copyCollections) {
         checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
     }
 
+    private boolean isRetryNeeded(final ProcessorNode processorNode, final 
StandardRepositoryRecord record, final FlowFileRecord currentFlowFile,
+                                  final int retryCounts, final Map<String, 
StandardRepositoryRecord> uuidsToRecords) {
+        if (currentFlowFile == null || processorNode == null || 
processorNode.getRetriedRelationships().isEmpty()) {
+            return false;
+        }
+
+        if 
(processorNode.isRetriedRelationship(record.getTransferRelationship())) {
+            return retryCounts < processorNode.getRetryCounts();
+        }
+
+        if (forkEventBuilders.get(currentFlowFile) != null) {
+            for (String uuid : 
forkEventBuilders.get(currentFlowFile).getChildFlowFileIds()) {
+                if 
(processorNode.isRetriedRelationship(uuidsToRecords.get(uuid).getTransferRelationship()))
 {
+                    return retryCounts < processorNode.getRetryCounts();
+                }
+            }
+        }
+        return false;
+    }
+
+    private FlowFileRecord updateFlowFileRecord(final StandardRepositoryRecord 
record,
+                                                final Map<String, 
StandardRepositoryRecord> uuidsToRecords,
+                                                final int retryCounts, final 
FlowFileRecord flowFileRecord) {
+
+        removeTemporaryClaim(record);
+        if (forkEventBuilders.get(flowFileRecord) != null) {
+            for (String uuid : 
forkEventBuilders.get(flowFileRecord).getChildFlowFileIds()) {
+                final StandardRepositoryRecord childRecord = 
uuidsToRecords.get(uuid);
+                removeTemporaryClaim(childRecord);
+                createdFlowFiles.remove(uuid);
+                records.remove(childRecord.getCurrent().getId());
+            }
+        }
+
+        final StandardFlowFileRecord.Builder builder = new 
StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord);
+        record.setTransferRelationship(null);
+        record.setDestination(record.getOriginalQueue());
+
+        builder.addAttribute("retryCounts", String.valueOf(retryCounts));
+        record.getUpdatedAttributes().clear();
+        final FlowFileRecord newFile = builder.build();
+        record.setWorking(newFile, false);
+        return newFile;
+    }
+
+    private void adjustProcessorStatistics(final StandardRepositoryRecord 
record, final Relationship relationship,
+                                           final Map<String, 
StandardRepositoryRecord> uuidsToRecords) {
+        final int numDestinations = 
context.getConnections(relationship).size();
+        final int multiplier = Math.max(1, numDestinations);
+        final ProvenanceEventBuilder eventBuilder = 
forkEventBuilders.get(record.getOriginal());
+        final List<String> childFlowFileIds = new ArrayList<>();
+        int contentSize = 0;
+
+        if (eventBuilder != null) {
+            childFlowFileIds.addAll(eventBuilder.getChildFlowFileIds());
+            for (String uuid : childFlowFileIds) {
+                contentSize += uuidsToRecords.get(uuid).getCurrent().getSize();
+            }
+        }
+
+        flowFilesIn--;
+        contentSizeIn -= record.getOriginal().getSize();

Review comment:
       Am a little bit torn about this - if we take in a FlowFile and then 
retry it, should we consider it as a FlowFile that's come in? On the one hand I 
think we should. Because if we process it 3 times, it should indicate that we 
had 3 FlowFiles come in. But at the same time, then we might show 5 FlowFiles 
in and 2 FlowFiles out, if there are retries, which might also lead to more 
confusion.....
   I think this is okay, but I believe that it warrants an update to the User 
Guide to explain that if a FlowFile is retried, it does not get counted toward 
the Input Counts




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to