This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e9e6b1f98b NIFI-14197 Fixed Exception Handling for FlowFileFilter with 
Queued FlowFiles (#9668)
e9e6b1f98b is described below

commit e9e6b1f98b6bbdc1c8b8d99c5be2f67d2da2119c
Author: Mark Payne <[email protected]>
AuthorDate: Wed Jan 29 16:09:28 2025 -0500

    NIFI-14197 Fixed Exception Handling for FlowFileFilter with Queued 
FlowFiles (#9668)
    
    - Ensure that any FlowFiles that are pulled from the FlowFileQueue during 
ProcessSession.get(FlowFileFilter) are placed back on the queue if an Exception 
is thrown by the filter
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../controller/queue/SwappablePriorityQueue.java   | 12 +++-
 .../clustered/TestSwappablePriorityQueue.java      | 34 +++++++++
 .../system/ThrowExceptionInFlowFileFilter.java     | 81 ++++++++++++++++++++++
 .../services/org.apache.nifi.processor.Processor   |  1 +
 .../processor/ExceptionInFlowFileFilterIT.java     | 61 ++++++++++++++++
 5 files changed, 188 insertions(+), 1 deletion(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
index d88ed12bf4..0790923aa6 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
@@ -637,7 +637,16 @@ public class SwappablePriorityQueue {
                     break; // just stop searching because the rest are all 
penalized.
                 }
 
-                final FlowFileFilterResult result = filter.filter(flowFile);
+                final FlowFileFilterResult result;
+                try {
+                    result = filter.filter(flowFile);
+                } catch (final Throwable t) {
+                    unselected.add(flowFile);
+                    activeQueue.addAll(unselected);
+                    activeQueue.addAll(selectedFlowFiles);
+                    throw t;
+                }
+
                 if (result.isAccept()) {
                     bytesPulled += flowFile.getSize();
                     flowFilesPulled++;
@@ -653,6 +662,7 @@ public class SwappablePriorityQueue {
             }
 
             this.activeQueue.addAll(unselected);
+
             unacknowledge(flowFilesPulled, bytesPulled);
 
             if (flowFilesExpired > 0) {
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
index d52dc05bf0..68bab06e16 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
@@ -27,7 +27,9 @@ import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.queue.SwappablePriorityQueue;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.util.StringUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -183,6 +185,38 @@ public class TestSwappablePriorityQueue {
         }
     }
 
+    @Test
+    public void testExceptionInPollAllowsReprocessing() {
+        for (int i = 0; i < 3; i++) {
+            final MockFlowFileRecord flowFile = new 
MockFlowFileRecord(Map.of("i", String.valueOf(i)), i);
+            queue.put(flowFile);
+        }
+
+        assertThrows(RuntimeException.class, () -> {
+            queue.poll(new FlowFileFilter() {
+                private int count = 0;
+
+                @Override
+                public FlowFileFilterResult filter(final FlowFile flowFile) {
+                    if (count == 0) {
+                        count++;
+                        return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+                    }
+                    throw new RuntimeException("Intentional Unit Test 
Exception");
+                }
+            }, Set.of(), 0L);
+        });
+
+        // Ensure that all FlowFiles are still accessible and are in the 
proper order.
+        for (int i = 0; i < 3; i++) {
+            final FlowFileRecord flowFile = queue.poll(Set.of(), 0L);
+            assertNotNull(flowFile, "FlowFile was null at iteration " + i);
+            assertEquals(String.valueOf(i), flowFile.getAttribute("i"));
+        }
+
+        assertNull(queue.poll(Set.of(), 0L));
+    }
+
     @Test
     public void testPrioritizersDataAddedAfterSwapOccurs() {
         final FlowFilePrioritizer iAttributePrioritizer = (o1, o2) -> {
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ThrowExceptionInFlowFileFilter.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ThrowExceptionInFlowFileFilter.java
new file mode 100644
index 0000000000..a93d065bc0
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ThrowExceptionInFlowFileFilter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+import java.util.Set;
+
+public class ThrowExceptionInFlowFileFilter extends AbstractProcessor {
+
+    static final PropertyDescriptor THROW_EXCEPTION = new 
PropertyDescriptor.Builder()
+        .name("Throw Exception")
+        .description("If true, the processor will throw an exception for each 
FlowFile that is processed")
+        .required(true)
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(
+        THROW_EXCEPTION
+    );
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("All FlowFiles are routed to this Relationship")
+        .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = Set.of(
+        REL_SUCCESS
+    );
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final boolean throwException = 
context.getProperty(THROW_EXCEPTION).asBoolean();
+
+        final List<FlowFile> flowFiles = session.get(new FlowFileFilter() {
+            @Override
+            public FlowFileFilterResult filter(final FlowFile flowFile) {
+                if (throwException) {
+                    throw new ProcessException("Throwing exception as 
configured");
+                }
+                return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+            }
+        });
+
+        session.transfer(flowFiles, REL_SUCCESS);
+    }
+}
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 0b344481fc..1c68a41f36 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -46,6 +46,7 @@ org.apache.nifi.processors.tests.system.SplitByLine
 org.apache.nifi.processors.tests.system.SplitTextByLine
 org.apache.nifi.processors.tests.system.TerminateFlowFile
 org.apache.nifi.processors.tests.system.TransferBatch
+org.apache.nifi.processors.tests.system.ThrowExceptionInFlowFileFilter
 org.apache.nifi.processors.tests.system.ThrowProcessException
 org.apache.nifi.processors.tests.system.UpdateContent
 org.apache.nifi.processors.tests.system.UnzipFlowFile
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/ExceptionInFlowFileFilterIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/ExceptionInFlowFileFilterIT.java
new file mode 100644
index 0000000000..c90856d092
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/ExceptionInFlowFileFilterIT.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.processor;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ExceptionInFlowFileFilterIT extends NiFiSystemIT {
+
+    @Test
+    public void testFlowFilesRemainAccessible() throws NiFiClientException, 
IOException, InterruptedException {
+        final ProcessorEntity generate = 
getClientUtil().createProcessor("GenerateFlowFile");
+        final ProcessorEntity throwException = 
getClientUtil().createProcessor("ThrowExceptionInFlowFileFilter");
+        final ProcessorEntity terminate = 
getClientUtil().createProcessor("TerminateFlowFile");
+
+        final ConnectionEntity generateToException = 
getClientUtil().createConnection(generate, throwException, "success");
+        final ConnectionEntity exceptionToTerminate = 
getClientUtil().createConnection(throwException, terminate, "success");
+
+        getClientUtil().waitForValidProcessor(generate.getId());
+        getClientUtil().waitForValidProcessor(throwException.getId());
+
+        getClientUtil().startProcessor(generate);
+        waitForQueueCount(generateToException, 1);
+        getClientUtil().startProcessor(throwException);
+
+        Thread.sleep(500L);
+        getClientUtil().stopProcessor(throwException);
+
+        getClientUtil().updateProcessorProperties(throwException, 
Map.of("Throw Exception", "false"));
+        getClientUtil().waitForValidProcessor(throwException.getId());
+
+        assertEquals(1, getConnectionQueueSize(generateToException.getId()));
+        getClientUtil().startProcessor(throwException);
+
+        waitForQueueCount(exceptionToTerminate, 1);
+    }
+
+}

Reply via email to