This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new e9e6b1f98b NIFI-14197 Fixed Exception Handling for FlowFileFilter with
Queued FlowFiles (#9668)
e9e6b1f98b is described below
commit e9e6b1f98b6bbdc1c8b8d99c5be2f67d2da2119c
Author: Mark Payne <[email protected]>
AuthorDate: Wed Jan 29 16:09:28 2025 -0500
NIFI-14197 Fixed Exception Handling for FlowFileFilter with Queued
FlowFiles (#9668)
- Ensure that any FlowFiles that are pulled from the FlowFileQueue during
ProcessSession.get(FlowFileFilter) are placed back on the queue if an Exception
is thrown by the filter
Signed-off-by: David Handermann <[email protected]>
---
.../controller/queue/SwappablePriorityQueue.java | 12 +++-
.../clustered/TestSwappablePriorityQueue.java | 34 +++++++++
.../system/ThrowExceptionInFlowFileFilter.java | 81 ++++++++++++++++++++++
.../services/org.apache.nifi.processor.Processor | 1 +
.../processor/ExceptionInFlowFileFilterIT.java | 61 ++++++++++++++++
5 files changed, 188 insertions(+), 1 deletion(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
index d88ed12bf4..0790923aa6 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
@@ -637,7 +637,16 @@ public class SwappablePriorityQueue {
break; // just stop searching because the rest are all
penalized.
}
- final FlowFileFilterResult result = filter.filter(flowFile);
+ final FlowFileFilterResult result;
+ try {
+ result = filter.filter(flowFile);
+ } catch (final Throwable t) {
+ unselected.add(flowFile);
+ activeQueue.addAll(unselected);
+ activeQueue.addAll(selectedFlowFiles);
+ throw t;
+ }
+
if (result.isAccept()) {
bytesPulled += flowFile.getSize();
flowFilesPulled++;
@@ -653,6 +662,7 @@ public class SwappablePriorityQueue {
}
this.activeQueue.addAll(unselected);
+
unacknowledge(flowFilesPulled, bytesPulled);
if (flowFilesExpired > 0) {
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
index d52dc05bf0..68bab06e16 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
@@ -27,7 +27,9 @@ import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.SwappablePriorityQueue;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.util.StringUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -183,6 +185,38 @@ public class TestSwappablePriorityQueue {
}
}
+ @Test
+ public void testExceptionInPollAllowsReprocessing() {
+ for (int i = 0; i < 3; i++) {
+ final MockFlowFileRecord flowFile = new
MockFlowFileRecord(Map.of("i", String.valueOf(i)), i);
+ queue.put(flowFile);
+ }
+
+ assertThrows(RuntimeException.class, () -> {
+ queue.poll(new FlowFileFilter() {
+ private int count = 0;
+
+ @Override
+ public FlowFileFilterResult filter(final FlowFile flowFile) {
+ if (count == 0) {
+ count++;
+ return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+ }
+ throw new RuntimeException("Intentional Unit Test
Exception");
+ }
+ }, Set.of(), 0L);
+ });
+
+ // Ensure that all FlowFiles are still accessible and are in the
proper order.
+ for (int i = 0; i < 3; i++) {
+ final FlowFileRecord flowFile = queue.poll(Set.of(), 0L);
+ assertNotNull(flowFile, "FlowFile was null at iteration " + i);
+ assertEquals(String.valueOf(i), flowFile.getAttribute("i"));
+ }
+
+ assertNull(queue.poll(Set.of(), 0L));
+ }
+
@Test
public void testPrioritizersDataAddedAfterSwapOccurs() {
final FlowFilePrioritizer iAttributePrioritizer = (o1, o2) -> {
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ThrowExceptionInFlowFileFilter.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ThrowExceptionInFlowFileFilter.java
new file mode 100644
index 0000000000..a93d065bc0
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ThrowExceptionInFlowFileFilter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+import java.util.Set;
+
+public class ThrowExceptionInFlowFileFilter extends AbstractProcessor {
+
+ static final PropertyDescriptor THROW_EXCEPTION = new
PropertyDescriptor.Builder()
+ .name("Throw Exception")
+ .description("If true, the processor will throw an exception for each
FlowFile that is processed")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
+ THROW_EXCEPTION
+ );
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All FlowFiles are routed to this Relationship")
+ .build();
+
+ private static final Set<Relationship> RELATIONSHIPS = Set.of(
+ REL_SUCCESS
+ );
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTY_DESCRIPTORS;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ final boolean throwException =
context.getProperty(THROW_EXCEPTION).asBoolean();
+
+ final List<FlowFile> flowFiles = session.get(new FlowFileFilter() {
+ @Override
+ public FlowFileFilterResult filter(final FlowFile flowFile) {
+ if (throwException) {
+ throw new ProcessException("Throwing exception as
configured");
+ }
+ return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+ }
+ });
+
+ session.transfer(flowFiles, REL_SUCCESS);
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 0b344481fc..1c68a41f36 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -46,6 +46,7 @@ org.apache.nifi.processors.tests.system.SplitByLine
org.apache.nifi.processors.tests.system.SplitTextByLine
org.apache.nifi.processors.tests.system.TerminateFlowFile
org.apache.nifi.processors.tests.system.TransferBatch
+org.apache.nifi.processors.tests.system.ThrowExceptionInFlowFileFilter
org.apache.nifi.processors.tests.system.ThrowProcessException
org.apache.nifi.processors.tests.system.UpdateContent
org.apache.nifi.processors.tests.system.UnzipFlowFile
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/ExceptionInFlowFileFilterIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/ExceptionInFlowFileFilterIT.java
new file mode 100644
index 0000000000..c90856d092
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/ExceptionInFlowFileFilterIT.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.processor;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ExceptionInFlowFileFilterIT extends NiFiSystemIT {
+
+ @Test
+ public void testFlowFilesRemainAccessible() throws NiFiClientException,
IOException, InterruptedException {
+ final ProcessorEntity generate =
getClientUtil().createProcessor("GenerateFlowFile");
+ final ProcessorEntity throwException =
getClientUtil().createProcessor("ThrowExceptionInFlowFileFilter");
+ final ProcessorEntity terminate =
getClientUtil().createProcessor("TerminateFlowFile");
+
+ final ConnectionEntity generateToException =
getClientUtil().createConnection(generate, throwException, "success");
+ final ConnectionEntity exceptionToTerminate =
getClientUtil().createConnection(throwException, terminate, "success");
+
+ getClientUtil().waitForValidProcessor(generate.getId());
+ getClientUtil().waitForValidProcessor(throwException.getId());
+
+ getClientUtil().startProcessor(generate);
+ waitForQueueCount(generateToException, 1);
+ getClientUtil().startProcessor(throwException);
+
+ Thread.sleep(500L);
+ getClientUtil().stopProcessor(throwException);
+
+ getClientUtil().updateProcessorProperties(throwException,
Map.of("Throw Exception", "false"));
+ getClientUtil().waitForValidProcessor(throwException.getId());
+
+ assertEquals(1, getConnectionQueueSize(generateToException.getId()));
+ getClientUtil().startProcessor(throwException);
+
+ waitForQueueCount(exceptionToTerminate, 1);
+ }
+
+}