markap14 commented on a change in pull request #5593:
URL: https://github.com/apache/nifi/pull/5593#discussion_r777515747
##########
File path: nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
##########
@@ -163,4 +163,8 @@
* @return the configured name of this processor
*/
String getName();
+
+ boolean isRetriedRelationship(Relationship relationship);
+
+ int getRetryCounts();
Review comment:
Again, should be `getRetryCount()` -- only a single count.
Need to make sure we add JavaDocs.
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
##########
@@ -1863,6 +1873,57 @@ public ScheduledState getDesiredState() {
return desiredState;
}
+ @Override
+ public int getRetryCounts() {
+ return retryCounts.get();
+ }
+
+ @Override
+ public synchronized void setRetryCounts(int retryCounts) {
+ this.retryCounts.set(retryCounts);
+ }
+
+ @Override
+ public Set<String> getRetriedRelationships() {
+ if (retriedRelationships.get() == null) {
+ return new HashSet<>();
+ }
+ return retriedRelationships.get();
+ }
+
+ @Override
+ public synchronized void setRetriedRelationships(Set<String>
retriedRelationships) {
Review comment:
`synchronized` keyword is unnecessary here.
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -356,6 +408,95 @@ private void checkpoint(final boolean copyCollections) {
checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
}
+ private boolean isRetryNeeded(final ProcessorNode processorNode, final
StandardRepositoryRecord record, final FlowFileRecord currentFlowFile,
+ final int retryCounts, final Map<String,
StandardRepositoryRecord> uuidsToRecords) {
+ if (currentFlowFile == null || processorNode == null ||
processorNode.getRetriedRelationships().isEmpty()) {
+ return false;
+ }
+
+ if
(processorNode.isRetriedRelationship(record.getTransferRelationship())) {
+ return retryCounts < processorNode.getRetryCounts();
+ }
+
+ if (forkEventBuilders.get(currentFlowFile) != null) {
+ for (String uuid :
forkEventBuilders.get(currentFlowFile).getChildFlowFileIds()) {
+ if
(processorNode.isRetriedRelationship(uuidsToRecords.get(uuid).getTransferRelationship()))
{
+ return retryCounts < processorNode.getRetryCounts();
+ }
+ }
+ }
+ return false;
+ }
+
+ private FlowFileRecord updateFlowFileRecord(final StandardRepositoryRecord
record,
+ final Map<String,
StandardRepositoryRecord> uuidsToRecords,
+ final int retryCounts, final
FlowFileRecord flowFileRecord) {
+
+ removeTemporaryClaim(record);
+ if (forkEventBuilders.get(flowFileRecord) != null) {
+ for (String uuid :
forkEventBuilders.get(flowFileRecord).getChildFlowFileIds()) {
+ final StandardRepositoryRecord childRecord =
uuidsToRecords.get(uuid);
+ removeTemporaryClaim(childRecord);
+ createdFlowFiles.remove(uuid);
+ records.remove(childRecord.getCurrent().getId());
+ }
+ }
+
+ final StandardFlowFileRecord.Builder builder = new
StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord);
+ record.setTransferRelationship(null);
+ record.setDestination(record.getOriginalQueue());
+
+ builder.addAttribute("retryCounts", String.valueOf(retryCounts));
+ record.getUpdatedAttributes().clear();
+ final FlowFileRecord newFile = builder.build();
+ record.setWorking(newFile, false);
+ return newFile;
+ }
+
+ private void adjustProcessorStatistics(final StandardRepositoryRecord
record, final Relationship relationship,
+ final Map<String,
StandardRepositoryRecord> uuidsToRecords) {
+ final int numDestinations =
context.getConnections(relationship).size();
+ final int multiplier = Math.max(1, numDestinations);
+ final ProvenanceEventBuilder eventBuilder =
forkEventBuilders.get(record.getOriginal());
+ final List<String> childFlowFileIds = new ArrayList<>();
+ int contentSize = 0;
+
+ if (eventBuilder != null) {
+ childFlowFileIds.addAll(eventBuilder.getChildFlowFileIds());
+ for (String uuid : childFlowFileIds) {
+ contentSize += uuidsToRecords.get(uuid).getCurrent().getSize();
+ }
+ }
+
+ flowFilesIn--;
+ contentSizeIn -= record.getOriginal().getSize();
+ flowFilesOut -= multiplier * (childFlowFileIds.size() + 1);
+ contentSizeOut -= (record.getCurrent().getSize() + contentSize) *
multiplier;
+ }
+
+ private void clearEvents(final FlowFileRecord flowFileRecord) {
+ Set<ProvenanceEventRecord> events =
provenanceReporter.getEvents().stream()
+ .filter(event ->
event.getFlowFileUuid().equals(flowFileRecord.getAttribute(CoreAttributes.UUID.key())))
+ .collect(Collectors.toSet());
+
+ for (ProvenanceEventRecord event : events) {
+ provenanceReporter.remove(event);
+ }
+
+ forkEventBuilders.remove(flowFileRecord);
+ }
+
+ private long calculateBackoffTime(int retryCounts, final long
maxBackoffPeriod, final long penalizationTime) {
+ if (retryCounts == 0) {
+ return penalizationTime;
+ }
+ long current = calculateBackoffTime(--retryCounts, maxBackoffPeriod,
penalizationTime) * 2;
Review comment:
Not sure that it makes sense to use a recursive call here. And I would
avoid decrementing the incoming argument during a call to recursion. I *think*
what this is intended to do is to just return twice the previous penalization
period, right? Perhaps using a loop to iterate over `for (int i=0; i <
retryCount; i++) {`... would make things clearer?
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -356,6 +408,95 @@ private void checkpoint(final boolean copyCollections) {
checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
}
+ private boolean isRetryNeeded(final ProcessorNode processorNode, final
StandardRepositoryRecord record, final FlowFileRecord currentFlowFile,
+ final int retryCounts, final Map<String,
StandardRepositoryRecord> uuidsToRecords) {
+ if (currentFlowFile == null || processorNode == null ||
processorNode.getRetriedRelationships().isEmpty()) {
+ return false;
+ }
+
+ if
(processorNode.isRetriedRelationship(record.getTransferRelationship())) {
+ return retryCounts < processorNode.getRetryCounts();
+ }
+
+ if (forkEventBuilders.get(currentFlowFile) != null) {
+ for (String uuid :
forkEventBuilders.get(currentFlowFile).getChildFlowFileIds()) {
+ if
(processorNode.isRetriedRelationship(uuidsToRecords.get(uuid).getTransferRelationship()))
{
+ return retryCounts < processorNode.getRetryCounts();
+ }
+ }
+ }
+ return false;
+ }
+
+ private FlowFileRecord updateFlowFileRecord(final StandardRepositoryRecord
record,
+ final Map<String,
StandardRepositoryRecord> uuidsToRecords,
+ final int retryCounts, final
FlowFileRecord flowFileRecord) {
+
+ removeTemporaryClaim(record);
+ if (forkEventBuilders.get(flowFileRecord) != null) {
+ for (String uuid :
forkEventBuilders.get(flowFileRecord).getChildFlowFileIds()) {
+ final StandardRepositoryRecord childRecord =
uuidsToRecords.get(uuid);
+ removeTemporaryClaim(childRecord);
+ createdFlowFiles.remove(uuid);
+ records.remove(childRecord.getCurrent().getId());
+ }
+ }
+
+ final StandardFlowFileRecord.Builder builder = new
StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord);
+ record.setTransferRelationship(null);
+ record.setDestination(record.getOriginalQueue());
+
+ builder.addAttribute("retryCounts", String.valueOf(retryCounts));
+ record.getUpdatedAttributes().clear();
+ final FlowFileRecord newFile = builder.build();
+ record.setWorking(newFile, false);
+ return newFile;
+ }
+
+ private void adjustProcessorStatistics(final StandardRepositoryRecord
record, final Relationship relationship,
+ final Map<String,
StandardRepositoryRecord> uuidsToRecords) {
+ final int numDestinations =
context.getConnections(relationship).size();
+ final int multiplier = Math.max(1, numDestinations);
+ final ProvenanceEventBuilder eventBuilder =
forkEventBuilders.get(record.getOriginal());
+ final List<String> childFlowFileIds = new ArrayList<>();
+ int contentSize = 0;
+
+ if (eventBuilder != null) {
+ childFlowFileIds.addAll(eventBuilder.getChildFlowFileIds());
+ for (String uuid : childFlowFileIds) {
+ contentSize += uuidsToRecords.get(uuid).getCurrent().getSize();
+ }
+ }
Review comment:
Not sure that this logic is correct for outbound content size if some of
the FlowFiles are auto-terminated, because we would not have incremented the
outbound content size for them.
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java
##########
@@ -308,4 +314,48 @@ public void setDefaultSchedulingPeriod(Map<String, String>
defaultSchedulingPeri
this.defaultSchedulingPeriod = defaultSchedulingPeriod;
}
+ @ApiModelProperty(
+ value = "Overall number of retries."
+ )
+ public Integer getRetryCounts() {
Review comment:
retryCounts again could be retryCount
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
##########
@@ -157,6 +157,11 @@
private final int hashCode;
private volatile boolean hasActiveThreads = false;
+ private final AtomicInteger retryCounts;
Review comment:
And again :)
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
##########
@@ -1863,6 +1873,57 @@ public ScheduledState getDesiredState() {
return desiredState;
}
+ @Override
+ public int getRetryCounts() {
+ return retryCounts.get();
+ }
+
+ @Override
+ public synchronized void setRetryCounts(int retryCounts) {
Review comment:
No need for this to be synchronized - you've already got the appropriate
synchronization mechanisms in place via the AtomicReference (or volatile
keyword)
##########
File path: nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
##########
@@ -163,4 +163,8 @@
* @return the configured name of this processor
*/
String getName();
+
+ boolean isRetriedRelationship(Relationship relationship);
Review comment:
Not a huge deal here but the name here reads a bit awkwardly for me.
`isRetriedRelationship` makes "retried" an adjective. Might make more sense to
call it `isRelationshipRetried` - so that you're essentially asking whether or
not this relationship should be retried.
In either case, though, we need to ensure that we add JavaDocs for any
method added to `ProcessContext`.
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
##########
@@ -157,6 +157,11 @@
private final int hashCode;
private volatile boolean hasActiveThreads = false;
+ private final AtomicInteger retryCounts;
+ private final AtomicReference<Set<String>> retriedRelationships;
+ private final AtomicReference<BackoffMechanism> backoffMechanism;
+ private final AtomicReference<String> maxBackoffPeriod;
Review comment:
All of these atomic variables would perhaps serve best as `volatile`
values. I don't see anywhere that we need to perform an atomic check-and-swap
(CAS) operations.
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
##########
@@ -1863,6 +1873,57 @@ public ScheduledState getDesiredState() {
return desiredState;
}
+ @Override
+ public int getRetryCounts() {
+ return retryCounts.get();
+ }
+
+ @Override
+ public synchronized void setRetryCounts(int retryCounts) {
+ this.retryCounts.set(retryCounts);
+ }
+
+ @Override
+ public Set<String> getRetriedRelationships() {
+ if (retriedRelationships.get() == null) {
+ return new HashSet<>();
+ }
+ return retriedRelationships.get();
+ }
+
+ @Override
+ public synchronized void setRetriedRelationships(Set<String>
retriedRelationships) {
+ this.retriedRelationships.set(retriedRelationships);
Review comment:
When we set a member variable to some mutable object like this Set, it's
important to consider what would happen if the caller (or some other block of
code) were to change that variable. That would inadvertently modify the
internal state of this Processor, which should not be allowed. So we might want
to create a defensive copy of the Set. So we could use something like:
```
this.retriedRelationships = (retriedRelationships == null) ?
Collections.emptySet() : new HashSet<>(retriedRelationships);
```
This also ensures that we never have a `null` collection. So there's no need
to check that in the `getRetriedRelationships()` and `isRetriedRelationship()`
method
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
##########
@@ -1863,6 +1873,57 @@ public ScheduledState getDesiredState() {
return desiredState;
}
+ @Override
+ public int getRetryCounts() {
+ return retryCounts.get();
+ }
+
+ @Override
+ public synchronized void setRetryCounts(int retryCounts) {
+ this.retryCounts.set(retryCounts);
+ }
+
+ @Override
+ public Set<String> getRetriedRelationships() {
+ if (retriedRelationships.get() == null) {
+ return new HashSet<>();
+ }
+ return retriedRelationships.get();
+ }
+
+ @Override
+ public synchronized void setRetriedRelationships(Set<String>
retriedRelationships) {
+ this.retriedRelationships.set(retriedRelationships);
+ }
+
+ @Override
+ public boolean isRetriedRelationship(final Relationship relationship) {
+ if (this.retriedRelationships.get() == null || relationship == null) {
+ return false;
+ } else {
+ return
this.retriedRelationships.get().contains(relationship.getName());
+ }
+ }
+
+ @Override
+ public BackoffMechanism getBackoffMechanism() {
+ return backoffMechanism.get();
+ }
+
+ @Override
+ public synchronized void setBackoffMechanism(BackoffMechanism
backoffMechanism) {
+ this.backoffMechanism.set(backoffMechanism);
+ }
+
+ @Override
+ public String getMaxBackoffPeriod() {
+ return maxBackoffPeriod.get();
+ }
+
+ @Override
+ public synchronized void setMaxBackoffPeriod(String maxBackoffPeriod) {
Review comment:
`synchronized` keyword is unnecessary here.
##########
File path: nifi-api/src/main/java/org/apache/nifi/flow/VersionedProcessor.java
##########
@@ -44,6 +44,11 @@
private Set<String> autoTerminatedRelationships;
private ScheduledState scheduledState;
+ private Integer retryCounts;
Review comment:
Should call this `retryCount` -- there is only one count :)
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
##########
@@ -1863,6 +1873,57 @@ public ScheduledState getDesiredState() {
return desiredState;
}
+ @Override
+ public int getRetryCounts() {
+ return retryCounts.get();
+ }
+
+ @Override
+ public synchronized void setRetryCounts(int retryCounts) {
+ this.retryCounts.set(retryCounts);
+ }
+
+ @Override
+ public Set<String> getRetriedRelationships() {
+ if (retriedRelationships.get() == null) {
+ return new HashSet<>();
+ }
+ return retriedRelationships.get();
+ }
+
+ @Override
+ public synchronized void setRetriedRelationships(Set<String>
retriedRelationships) {
+ this.retriedRelationships.set(retriedRelationships);
+ }
+
+ @Override
+ public boolean isRetriedRelationship(final Relationship relationship) {
+ if (this.retriedRelationships.get() == null || relationship == null) {
+ return false;
+ } else {
+ return
this.retriedRelationships.get().contains(relationship.getName());
+ }
+ }
+
+ @Override
+ public BackoffMechanism getBackoffMechanism() {
+ return backoffMechanism.get();
+ }
+
+ @Override
+ public synchronized void setBackoffMechanism(BackoffMechanism
backoffMechanism) {
+ this.backoffMechanism.set(backoffMechanism);
+ }
+
+ @Override
+ public String getMaxBackoffPeriod() {
+ return maxBackoffPeriod.get();
+ }
+
+ @Override
+ public synchronized void setMaxBackoffPeriod(String maxBackoffPeriod) {
+ this.maxBackoffPeriod.set(maxBackoffPeriod);
Review comment:
This is expected to be parsed later into a time period. Best to validate
the input before simply storing it. See `setYieldPeriod()` for an example. We
should ensure not only that it's a valid time period syntactically but also
that it's a possible value
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
##########
@@ -1863,6 +1873,57 @@ public ScheduledState getDesiredState() {
return desiredState;
}
+ @Override
+ public int getRetryCounts() {
+ return retryCounts.get();
+ }
+
+ @Override
+ public synchronized void setRetryCounts(int retryCounts) {
+ this.retryCounts.set(retryCounts);
+ }
+
+ @Override
+ public Set<String> getRetriedRelationships() {
+ if (retriedRelationships.get() == null) {
+ return new HashSet<>();
+ }
+ return retriedRelationships.get();
+ }
+
+ @Override
+ public synchronized void setRetriedRelationships(Set<String>
retriedRelationships) {
+ this.retriedRelationships.set(retriedRelationships);
+ }
+
+ @Override
+ public boolean isRetriedRelationship(final Relationship relationship) {
+ if (this.retriedRelationships.get() == null || relationship == null) {
+ return false;
+ } else {
+ return
this.retriedRelationships.get().contains(relationship.getName());
+ }
+ }
+
+ @Override
+ public BackoffMechanism getBackoffMechanism() {
+ return backoffMechanism.get();
+ }
+
+ @Override
+ public synchronized void setBackoffMechanism(BackoffMechanism
backoffMechanism) {
Review comment:
`synchronized` keyword is unnecessary here.
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
##########
@@ -835,6 +837,33 @@ public Response updateProcessor(
}
}
+ final ProcessorConfigDTO processorConfig =
requestProcessorDTO.getConfig();
+ if (processorConfig != null) {
+ if (processorConfig.getRetryCounts() != null &&
processorConfig.getRetryCounts() < 0) {
+ throw new IllegalArgumentException("Retry Counts should not be
less than zero.");
+ }
+
+ if (processorConfig.getBackoffMechanism() != null) {
+ try {
+
BackoffMechanism.valueOf(processorConfig.getBackoffMechanism());
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Backoff Mechanism " +
processorConfig.getBackoffMechanism() + " is invalid.");
+ }
+ }
+
+ if (processorConfig.getMaxBackoffPeriod() != null &&
!FormatUtils.TIME_DURATION_PATTERN.matcher(processorConfig.getMaxBackoffPeriod()).matches())
{
+ throw new IllegalArgumentException("Max Backoff Period should
be specified as time, for example 5 mins");
+ }
+
+ if (processorConfig.getRetriedRelationships() != null &&
processorConfig.getAutoTerminatedRelationships() != null) {
+ for (String relationship :
processorConfig.getAutoTerminatedRelationships()) {
+ if
(processorConfig.getRetriedRelationships().stream().anyMatch(rel ->
rel.equals(relationship))) {
+ throw new IllegalArgumentException("Auto terminated
relationship " + relationship + " can not be retried");
Review comment:
Why do we have this restriction? It definitely makes sense to allow
auto-terminated relationships to be retried. You could, for instance, configure
your flow so that it retries a 'failure' relationship 10 times. After 10
times, if still unable to deliver to the destination then auto-terminate it. I
think this is likely to be a common use case.
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
##########
@@ -2800,7 +2809,299 @@ public void
testCreateNewFlowFileWithoutParentThenMultipleWritesCountsClaimRefer
assertEquals(0,
contentRepo.getClaimantCount(getContentClaim(flowFile)));
}
+ @Test
+ public void testWhenInRetryAttributeIsAdded() {
+ final Connectable processor = createProcessorConnectable();
+ configureRetry(processor, 1, BackoffMechanism.PENALIZE_FLOWFILE, "1
ms", 1L);
+
+ StandardProcessSession session = createSessionForRetry(processor);
+
+ final FlowFileRecord flowFileRecord = new
StandardFlowFileRecord.Builder()
+ .build();
+
+ flowFileQueue.put(flowFileRecord);
+
+ final Relationship relationship = new
Relationship.Builder().name("A").build();
+
+ FlowFile ff1 = session.get();
+ assertNotNull(ff1);
+ session.transfer(flowFileRecord, relationship);
+ session.checkpoint();
Review comment:
Should not call `checkpoint()` here - this is something that is handled
internally by the ProcessSession when `commit()` gets called.
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -356,6 +408,95 @@ private void checkpoint(final boolean copyCollections) {
checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
}
+ private boolean isRetryNeeded(final ProcessorNode processorNode, final
StandardRepositoryRecord record, final FlowFileRecord currentFlowFile,
+ final int retryCounts, final Map<String,
StandardRepositoryRecord> uuidsToRecords) {
+ if (currentFlowFile == null || processorNode == null ||
processorNode.getRetriedRelationships().isEmpty()) {
+ return false;
+ }
+
+ if
(processorNode.isRetriedRelationship(record.getTransferRelationship())) {
+ return retryCounts < processorNode.getRetryCounts();
+ }
+
+ if (forkEventBuilders.get(currentFlowFile) != null) {
+ for (String uuid :
forkEventBuilders.get(currentFlowFile).getChildFlowFileIds()) {
+ if
(processorNode.isRetriedRelationship(uuidsToRecords.get(uuid).getTransferRelationship()))
{
+ return retryCounts < processorNode.getRetryCounts();
+ }
+ }
+ }
+ return false;
+ }
+
+ private FlowFileRecord updateFlowFileRecord(final StandardRepositoryRecord
record,
Review comment:
`updateFlowFileRecord` is a very vague name and doesn't tell me much of
anything about what this method is doing.
##########
File path:
nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RetryIT.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.tests.system.processor;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class RetryIT extends NiFiSystemIT {
+ private static final int RETRY_COUNTS = 2;
+ private static final String EXPECTED_NO_RETRY_COUNTS = null;
+ private static final String EXPECTED_FIRST_RETRY =
String.valueOf(RETRY_COUNTS - 1);
+ private static final String EXPECTED_SECOND_RETRY =
String.valueOf(RETRY_COUNTS);
+
+ @Test
+ public void testRetryHappensTwiceThenFinishes() throws
NiFiClientException, IOException, InterruptedException {
+ //Create a GenerateFlowFile processor
+ final ProcessorEntity generateFlowFile =
getClientUtil().createProcessor("GenerateFlowFile");
+
+ //Create a PassThrough processor
+ final ProcessorEntity passThrough =
getClientUtil().createProcessor("PassThrough");
+
+ //Configure the processor's success relationship to be retried twice
with flow file penalization for a maximum of 1 ms
+ final ProcessorConfigDTO config = new ProcessorConfigDTO();
+ config.setRetryCounts(RETRY_COUNTS);
+ config.setMaxBackoffPeriod("1 ms");
+ config.setBackoffMechanism("PENALIZE_FLOWFILE");
+ config.setRetriedRelationships(Collections.singleton("success"));
+ config.setPenaltyDuration("1 ms");
+ getClientUtil().updateProcessorConfig(passThrough, config);
Review comment:
This block appears to be super repetitive. Might consider creating a
simple method:
```
void enableRetries(ProcessorEntity processor, String relationship);
```
that performs all of this logic.
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -356,6 +408,95 @@ private void checkpoint(final boolean copyCollections) {
checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
}
+ private boolean isRetryNeeded(final ProcessorNode processorNode, final
StandardRepositoryRecord record, final FlowFileRecord currentFlowFile,
+ final int retryCounts, final Map<String,
StandardRepositoryRecord> uuidsToRecords) {
+ if (currentFlowFile == null || processorNode == null ||
processorNode.getRetriedRelationships().isEmpty()) {
+ return false;
+ }
+
+ if
(processorNode.isRetriedRelationship(record.getTransferRelationship())) {
+ return retryCounts < processorNode.getRetryCounts();
+ }
+
+ if (forkEventBuilders.get(currentFlowFile) != null) {
+ for (String uuid :
forkEventBuilders.get(currentFlowFile).getChildFlowFileIds()) {
+ if
(processorNode.isRetriedRelationship(uuidsToRecords.get(uuid).getTransferRelationship()))
{
+ return retryCounts < processorNode.getRetryCounts();
+ }
+ }
+ }
+ return false;
+ }
+
+ private FlowFileRecord updateFlowFileRecord(final StandardRepositoryRecord
record,
+ final Map<String,
StandardRepositoryRecord> uuidsToRecords,
+ final int retryCounts, final
FlowFileRecord flowFileRecord) {
+
+ removeTemporaryClaim(record);
+ if (forkEventBuilders.get(flowFileRecord) != null) {
+ for (String uuid :
forkEventBuilders.get(flowFileRecord).getChildFlowFileIds()) {
+ final StandardRepositoryRecord childRecord =
uuidsToRecords.get(uuid);
+ removeTemporaryClaim(childRecord);
+ createdFlowFiles.remove(uuid);
+ records.remove(childRecord.getCurrent().getId());
+ }
+ }
+
+ final StandardFlowFileRecord.Builder builder = new
StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord);
+ record.setTransferRelationship(null);
+ record.setDestination(record.getOriginalQueue());
+
+ builder.addAttribute("retryCounts", String.valueOf(retryCounts));
+ record.getUpdatedAttributes().clear();
+ final FlowFileRecord newFile = builder.build();
+ record.setWorking(newFile, false);
+ return newFile;
+ }
+
+ private void adjustProcessorStatistics(final StandardRepositoryRecord
record, final Relationship relationship,
Review comment:
`adjustProcessorStatistics` is also quite vague - how is it adjusting
them? Why is it adjusting them this way? Perhaps it should be called something
like `adjustConnectableStatsForRetry`?
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -356,6 +408,95 @@ private void checkpoint(final boolean copyCollections) {
checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
}
+ private boolean isRetryNeeded(final ProcessorNode processorNode, final
StandardRepositoryRecord record, final FlowFileRecord currentFlowFile,
+ final int retryCounts, final Map<String,
StandardRepositoryRecord> uuidsToRecords) {
+ if (currentFlowFile == null || processorNode == null ||
processorNode.getRetriedRelationships().isEmpty()) {
+ return false;
+ }
+
+ if
(processorNode.isRetriedRelationship(record.getTransferRelationship())) {
Review comment:
This is not a very clean separation of concerns here, to add the
retry-related methods only to ProcessorNode, and then have instances where we
have to check if a `Connectable` is a `ProcessorNode` and only then treat it
differently. Conceptually, it makes sense that any `Connectable` object could
retry, even though they may not - especially initially. But I can envision this
eventually being used to improve Site-to-Site connections, etc.
It would make more sense to add the retry-related methods to `Connectable`
and then implement `AbstractPort` and `StandardFunnel` to simply retry 0 for
retry count, an empty collection for relationships that get retried, etc.
There's nothing special about ProcessorNode as an entity that makes it uniquely
suited for this. Should be on all Connectables.
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -356,6 +408,95 @@ private void checkpoint(final boolean copyCollections) {
checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
}
+ private boolean isRetryNeeded(final ProcessorNode processorNode, final
StandardRepositoryRecord record, final FlowFileRecord currentFlowFile,
+ final int retryCounts, final Map<String,
StandardRepositoryRecord> uuidsToRecords) {
+ if (currentFlowFile == null || processorNode == null ||
processorNode.getRetriedRelationships().isEmpty()) {
+ return false;
+ }
+
+ if
(processorNode.isRetriedRelationship(record.getTransferRelationship())) {
+ return retryCounts < processorNode.getRetryCounts();
+ }
+
+ if (forkEventBuilders.get(currentFlowFile) != null) {
+ for (String uuid :
forkEventBuilders.get(currentFlowFile).getChildFlowFileIds()) {
+ if
(processorNode.isRetriedRelationship(uuidsToRecords.get(uuid).getTransferRelationship()))
{
+ return retryCounts < processorNode.getRetryCounts();
+ }
+ }
+ }
+ return false;
+ }
+
+ private FlowFileRecord updateFlowFileRecord(final StandardRepositoryRecord
record,
+ final Map<String,
StandardRepositoryRecord> uuidsToRecords,
+ final int retryCounts, final
FlowFileRecord flowFileRecord) {
+
+ removeTemporaryClaim(record);
+ if (forkEventBuilders.get(flowFileRecord) != null) {
+ for (String uuid :
forkEventBuilders.get(flowFileRecord).getChildFlowFileIds()) {
+ final StandardRepositoryRecord childRecord =
uuidsToRecords.get(uuid);
+ removeTemporaryClaim(childRecord);
+ createdFlowFiles.remove(uuid);
+ records.remove(childRecord.getCurrent().getId());
+ }
+ }
+
+ final StandardFlowFileRecord.Builder builder = new
StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord);
+ record.setTransferRelationship(null);
+ record.setDestination(record.getOriginalQueue());
+
+ builder.addAttribute("retryCounts", String.valueOf(retryCounts));
+ record.getUpdatedAttributes().clear();
Review comment:
We should not be calling `clear()` here. This is attempting to modify
internal state of another component, which could have unexpected consequences.
And it will definitely fail in some cases.
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -356,6 +408,95 @@ private void checkpoint(final boolean copyCollections) {
checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
}
+ private boolean isRetryNeeded(final ProcessorNode processorNode, final
StandardRepositoryRecord record, final FlowFileRecord currentFlowFile,
+ final int retryCounts, final Map<String,
StandardRepositoryRecord> uuidsToRecords) {
+ if (currentFlowFile == null || processorNode == null ||
processorNode.getRetriedRelationships().isEmpty()) {
+ return false;
+ }
+
+ if
(processorNode.isRetriedRelationship(record.getTransferRelationship())) {
+ return retryCounts < processorNode.getRetryCounts();
+ }
+
+ if (forkEventBuilders.get(currentFlowFile) != null) {
+ for (String uuid :
forkEventBuilders.get(currentFlowFile).getChildFlowFileIds()) {
+ if
(processorNode.isRetriedRelationship(uuidsToRecords.get(uuid).getTransferRelationship()))
{
+ return retryCounts < processorNode.getRetryCounts();
+ }
+ }
+ }
+ return false;
+ }
+
+ private FlowFileRecord updateFlowFileRecord(final StandardRepositoryRecord
record,
+ final Map<String,
StandardRepositoryRecord> uuidsToRecords,
+ final int retryCounts, final
FlowFileRecord flowFileRecord) {
+
+ removeTemporaryClaim(record);
+ if (forkEventBuilders.get(flowFileRecord) != null) {
+ for (String uuid :
forkEventBuilders.get(flowFileRecord).getChildFlowFileIds()) {
+ final StandardRepositoryRecord childRecord =
uuidsToRecords.get(uuid);
+ removeTemporaryClaim(childRecord);
+ createdFlowFiles.remove(uuid);
+ records.remove(childRecord.getCurrent().getId());
+ }
+ }
+
+ final StandardFlowFileRecord.Builder builder = new
StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord);
+ record.setTransferRelationship(null);
+ record.setDestination(record.getOriginalQueue());
+
+ builder.addAttribute("retryCounts", String.valueOf(retryCounts));
+ record.getUpdatedAttributes().clear();
+ final FlowFileRecord newFile = builder.build();
+ record.setWorking(newFile, false);
+ return newFile;
+ }
+
+ private void adjustProcessorStatistics(final StandardRepositoryRecord
record, final Relationship relationship,
+ final Map<String,
StandardRepositoryRecord> uuidsToRecords) {
+ final int numDestinations =
context.getConnections(relationship).size();
+ final int multiplier = Math.max(1, numDestinations);
+ final ProvenanceEventBuilder eventBuilder =
forkEventBuilders.get(record.getOriginal());
+ final List<String> childFlowFileIds = new ArrayList<>();
+ int contentSize = 0;
+
+ if (eventBuilder != null) {
+ childFlowFileIds.addAll(eventBuilder.getChildFlowFileIds());
+ for (String uuid : childFlowFileIds) {
+ contentSize += uuidsToRecords.get(uuid).getCurrent().getSize();
+ }
+ }
+
+ flowFilesIn--;
+ contentSizeIn -= record.getOriginal().getSize();
+ flowFilesOut -= multiplier * (childFlowFileIds.size() + 1);
+ contentSizeOut -= (record.getCurrent().getSize() + contentSize) *
multiplier;
+ }
+
+ private void clearEvents(final FlowFileRecord flowFileRecord) {
Review comment:
What events is this clearing? Perhaps `clearProvenanceEvents` makes more
sense?
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -2015,6 +2156,18 @@ public FlowFile penalize(FlowFile flowFile) {
return newFile;
}
+ public FlowFile penalize(FlowFile flowFile, long period, TimeUnit
timeUnit) {
Review comment:
Should probably update the previous penalize() call to just call
`penalize(flowFile,
context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS);` no?
I.e., make use of this method, rather than repeating the logic
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -356,6 +408,95 @@ private void checkpoint(final boolean copyCollections) {
checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
}
+ private boolean isRetryNeeded(final ProcessorNode processorNode, final
StandardRepositoryRecord record, final FlowFileRecord currentFlowFile,
+ final int retryCounts, final Map<String,
StandardRepositoryRecord> uuidsToRecords) {
+ if (currentFlowFile == null || processorNode == null ||
processorNode.getRetriedRelationships().isEmpty()) {
+ return false;
+ }
+
+ if
(processorNode.isRetriedRelationship(record.getTransferRelationship())) {
+ return retryCounts < processorNode.getRetryCounts();
+ }
+
+ if (forkEventBuilders.get(currentFlowFile) != null) {
+ for (String uuid :
forkEventBuilders.get(currentFlowFile).getChildFlowFileIds()) {
+ if
(processorNode.isRetriedRelationship(uuidsToRecords.get(uuid).getTransferRelationship()))
{
+ return retryCounts < processorNode.getRetryCounts();
+ }
+ }
+ }
+ return false;
+ }
+
+ private FlowFileRecord updateFlowFileRecord(final StandardRepositoryRecord
record,
+ final Map<String,
StandardRepositoryRecord> uuidsToRecords,
+ final int retryCounts, final
FlowFileRecord flowFileRecord) {
+
+ removeTemporaryClaim(record);
+ if (forkEventBuilders.get(flowFileRecord) != null) {
+ for (String uuid :
forkEventBuilders.get(flowFileRecord).getChildFlowFileIds()) {
+ final StandardRepositoryRecord childRecord =
uuidsToRecords.get(uuid);
+ removeTemporaryClaim(childRecord);
+ createdFlowFiles.remove(uuid);
+ records.remove(childRecord.getCurrent().getId());
+ }
+ }
+
+ final StandardFlowFileRecord.Builder builder = new
StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord);
+ record.setTransferRelationship(null);
+ record.setDestination(record.getOriginalQueue());
+
+ builder.addAttribute("retryCounts", String.valueOf(retryCounts));
+ record.getUpdatedAttributes().clear();
+ final FlowFileRecord newFile = builder.build();
+ record.setWorking(newFile, false);
+ return newFile;
+ }
+
+ private void adjustProcessorStatistics(final StandardRepositoryRecord
record, final Relationship relationship,
+ final Map<String,
StandardRepositoryRecord> uuidsToRecords) {
+ final int numDestinations =
context.getConnections(relationship).size();
+ final int multiplier = Math.max(1, numDestinations);
+ final ProvenanceEventBuilder eventBuilder =
forkEventBuilders.get(record.getOriginal());
+ final List<String> childFlowFileIds = new ArrayList<>();
+ int contentSize = 0;
+
+ if (eventBuilder != null) {
+ childFlowFileIds.addAll(eventBuilder.getChildFlowFileIds());
+ for (String uuid : childFlowFileIds) {
+ contentSize += uuidsToRecords.get(uuid).getCurrent().getSize();
+ }
+ }
+
+ flowFilesIn--;
+ contentSizeIn -= record.getOriginal().getSize();
+ flowFilesOut -= multiplier * (childFlowFileIds.size() + 1);
+ contentSizeOut -= (record.getCurrent().getSize() + contentSize) *
multiplier;
+ }
+
+ private void clearEvents(final FlowFileRecord flowFileRecord) {
+ Set<ProvenanceEventRecord> events =
provenanceReporter.getEvents().stream()
+ .filter(event ->
event.getFlowFileUuid().equals(flowFileRecord.getAttribute(CoreAttributes.UUID.key())))
+ .collect(Collectors.toSet());
+
+ for (ProvenanceEventRecord event : events) {
+ provenanceReporter.remove(event);
+ }
+
+ forkEventBuilders.remove(flowFileRecord);
+ }
+
+ private long calculateBackoffTime(int retryCounts, final long
maxBackoffPeriod, final long penalizationTime) {
+ if (retryCounts == 0) {
+ return penalizationTime;
+ }
+ long current = calculateBackoffTime(--retryCounts, maxBackoffPeriod,
penalizationTime) * 2;
+ if (current >= maxBackoffPeriod) {
+ return maxBackoffPeriod;
+ }
+ return current;
Review comment:
Simpler (and clearer) to just `return Math.max(current,
maxBackoffPeriod);` ?
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -356,6 +408,95 @@ private void checkpoint(final boolean copyCollections) {
checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
}
+ private boolean isRetryNeeded(final ProcessorNode processorNode, final
StandardRepositoryRecord record, final FlowFileRecord currentFlowFile,
+ final int retryCounts, final Map<String,
StandardRepositoryRecord> uuidsToRecords) {
+ if (currentFlowFile == null || processorNode == null ||
processorNode.getRetriedRelationships().isEmpty()) {
+ return false;
+ }
+
+ if
(processorNode.isRetriedRelationship(record.getTransferRelationship())) {
+ return retryCounts < processorNode.getRetryCounts();
+ }
+
+ if (forkEventBuilders.get(currentFlowFile) != null) {
+ for (String uuid :
forkEventBuilders.get(currentFlowFile).getChildFlowFileIds()) {
Review comment:
Rather than call `Map.get()` twice, the first time to compare to null,
the second time to operate on the return value, should call `Map.get()` once
and store value in a local variable. The call to `Map.get()` can be pretty
expensive if repeated over & over, due to the hashing that happens there.
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -356,6 +408,95 @@ private void checkpoint(final boolean copyCollections) {
checkpoint.checkpoint(this, autoTerminatedEvents, copyCollections);
}
+ private boolean isRetryNeeded(final ProcessorNode processorNode, final
StandardRepositoryRecord record, final FlowFileRecord currentFlowFile,
+ final int retryCounts, final Map<String,
StandardRepositoryRecord> uuidsToRecords) {
+ if (currentFlowFile == null || processorNode == null ||
processorNode.getRetriedRelationships().isEmpty()) {
+ return false;
+ }
+
+ if
(processorNode.isRetriedRelationship(record.getTransferRelationship())) {
+ return retryCounts < processorNode.getRetryCounts();
+ }
+
+ if (forkEventBuilders.get(currentFlowFile) != null) {
+ for (String uuid :
forkEventBuilders.get(currentFlowFile).getChildFlowFileIds()) {
+ if
(processorNode.isRetriedRelationship(uuidsToRecords.get(uuid).getTransferRelationship()))
{
+ return retryCounts < processorNode.getRetryCounts();
+ }
+ }
+ }
+ return false;
+ }
+
+ private FlowFileRecord updateFlowFileRecord(final StandardRepositoryRecord
record,
+ final Map<String,
StandardRepositoryRecord> uuidsToRecords,
+ final int retryCounts, final
FlowFileRecord flowFileRecord) {
+
+ removeTemporaryClaim(record);
+ if (forkEventBuilders.get(flowFileRecord) != null) {
+ for (String uuid :
forkEventBuilders.get(flowFileRecord).getChildFlowFileIds()) {
+ final StandardRepositoryRecord childRecord =
uuidsToRecords.get(uuid);
+ removeTemporaryClaim(childRecord);
+ createdFlowFiles.remove(uuid);
+ records.remove(childRecord.getCurrent().getId());
+ }
+ }
+
+ final StandardFlowFileRecord.Builder builder = new
StandardFlowFileRecord.Builder().fromFlowFile(flowFileRecord);
+ record.setTransferRelationship(null);
+ record.setDestination(record.getOriginalQueue());
+
+ builder.addAttribute("retryCounts", String.valueOf(retryCounts));
+ record.getUpdatedAttributes().clear();
+ final FlowFileRecord newFile = builder.build();
+ record.setWorking(newFile, false);
+ return newFile;
+ }
+
+ private void adjustProcessorStatistics(final StandardRepositoryRecord
record, final Relationship relationship,
+ final Map<String,
StandardRepositoryRecord> uuidsToRecords) {
+ final int numDestinations =
context.getConnections(relationship).size();
+ final int multiplier = Math.max(1, numDestinations);
+ final ProvenanceEventBuilder eventBuilder =
forkEventBuilders.get(record.getOriginal());
+ final List<String> childFlowFileIds = new ArrayList<>();
+ int contentSize = 0;
+
+ if (eventBuilder != null) {
+ childFlowFileIds.addAll(eventBuilder.getChildFlowFileIds());
+ for (String uuid : childFlowFileIds) {
+ contentSize += uuidsToRecords.get(uuid).getCurrent().getSize();
+ }
+ }
+
+ flowFilesIn--;
+ contentSizeIn -= record.getOriginal().getSize();
Review comment:
Am a little bit torn about this - if we take in a FlowFile and then
retry it, should we consider it as a FlowFile that's come in? On the one hand I
think we should. Because if we process it 3 times, it should indicate that we
had 3 FlowFiles come in. But at the same time, then we might show 5 FlowFiles
in and 2 FlowFiles out, if there are retries, which might also lead to more
confusion.....
I think this is okay, but I believe that it warrants an update to the User
Guide to explain that if a FlowFile is retried, it does not get counted toward
the Input Counts
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]