This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 711f2f1188 NIFI-12137: This closes #7808. Fixed name of Expression
Language Scope from LIMITED to ENVIRONMENT, ; fixed issue in which flowfiles
routed to failure by FlowFileTransform were also being routed to original;
defined 'identifier' and 'logger' as member variables for FlowFileTransform and
RecordTransform so that IDEs know about them
711f2f1188 is described below
commit 711f2f11888b9ea9eace8816758ad1d0d2075aa5
Author: Mark Payne <[email protected]>
AuthorDate: Thu Sep 28 10:09:10 2023 -0400
NIFI-12137: This closes #7808. Fixed name of Expression Language Scope from
LIMITED to ENVIRONMENT, ; fixed issue in which flowfiles routed to failure by
FlowFileTransform were also being routed to original; defined 'identifier' and
'logger' as member variables for FlowFileTransform and RecordTransform so that
IDEs know about them
Signed-off-by: Joseph Witt <[email protected]>
---
.../apache/nifi/groups/StandardProcessGroup.java | 5 +++++
.../python/processor/FlowFileTransformProxy.java | 12 +++++++----
.../python/processor/PythonProcessorProxy.java | 19 +++++++++---------
.../python/processor/RecordTransformProxy.java | 23 +++++++++++-----------
.../main/python/src/nifiapi/flowfiletransform.py | 4 ++++
.../src/main/python/src/nifiapi/properties.py | 7 ++++---
.../src/main/python/src/nifiapi/recordtransform.py | 4 ++++
7 files changed, 45 insertions(+), 29 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index c56a779e9e..a5d0d388ed 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -1236,6 +1236,11 @@ public final class StandardProcessGroup implements
ProcessGroup {
conn.verifyCanDelete();
}
+ // Avoid performing any more validation on the processor, as it is
no longer necessary and may
+ // cause issues with Python-based Processor, as validation may
trigger, attempting to communicate
+ // with the Python process even after the Python process has been
destroyed.
+ processor.pauseValidationTrigger();
+
try (final NarCloseable x =
NarCloseable.withComponentNarLoader(extensionManager,
processor.getProcessor().getClass(), processor.getIdentifier())) {
final StandardProcessContext processContext = new
StandardProcessContext(processor, controllerServiceProvider,
getStateManager(processor.getIdentifier()), () -> false,
nodeTypeProvider);
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java
index 50bc4ae70d..8ebf00eb19 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java
@@ -17,6 +17,7 @@
package org.apache.nifi.python.processor;
+import java.util.Map;
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -27,8 +28,6 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import py4j.Py4JNetworkException;
-import java.util.Map;
-
@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
public class FlowFileTransformProxy extends PythonProcessorProxy {
@@ -76,6 +75,13 @@ public class FlowFileTransformProxy extends
PythonProcessorProxy {
session.transfer(original, REL_FAILURE);
return;
}
+ final String relationshipName = result.getRelationship();
+ final Relationship relationship = new
Relationship.Builder().name(relationshipName).build();
+ if (REL_FAILURE.getName().equals(relationshipName)) {
+ session.remove(transformed);
+ session.transfer(original, REL_FAILURE);
+ return;
+ }
final Map<String, String> attributes = result.getAttributes();
if (attributes != null) {
@@ -87,8 +93,6 @@ public class FlowFileTransformProxy extends
PythonProcessorProxy {
transformed = session.write(transformed, out ->
out.write(contents));
}
- final String relationshipName = result.getRelationship();
- final Relationship relationship = new
Relationship.Builder().name(relationshipName).build();
session.transfer(transformed, relationship);
session.transfer(original, REL_ORIGINAL);
}
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java
index a2de5bebdd..987c75e074 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java
@@ -17,15 +17,6 @@
package org.apache.nifi.python.processor;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.Relationship;
-
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -34,6 +25,14 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
public abstract class PythonProcessorProxy extends AbstractProcessor {
private final PythonProcessorBridge bridge;
@@ -124,7 +123,7 @@ public abstract class PythonProcessorProxy extends
AbstractProcessor {
// We cache this to avoid having to call into the Python side while
the Processor is running. However, once
// it is stopped, its relationships may change due to properties, etc.
final Set<Relationship> relationships =
fetchRelationshipsFromPythonProcessor();
- this.cachedRelationships = Collections.unmodifiableSet(new
HashSet<>(relationships));
+ this.cachedRelationships = Set.copyOf(relationships);
}
@OnScheduled
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java
index 3914f13e46..a9e400a004 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java
@@ -17,6 +17,17 @@
package org.apache.nifi.python.processor;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import org.apache.nifi.NullSuppression;
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -44,18 +55,6 @@ import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
public class RecordTransformProxy extends PythonProcessorProxy {
private final PythonProcessorBridge bridge;
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/flowfiletransform.py
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/flowfiletransform.py
index 78fecf79d9..4b2f47d17e 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/flowfiletransform.py
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/flowfiletransform.py
@@ -19,6 +19,10 @@ from nifiapi.properties import ProcessContext
class FlowFileTransform(ABC):
+ # These will be set by the PythonProcessorAdapter when the component is
created
+ identifier = None
+ logger = None
+
def __init__(self):
self.arrayList = JvmHolder.jvm.java.util.ArrayList
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/properties.py
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/properties.py
index d5f3e153d9..885447914c 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/properties.py
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/properties.py
@@ -22,7 +22,7 @@ EMPTY_ALLOWABLE_VALUE_ARRAY =
JvmHolder.gateway.new_array(JvmHolder.jvm.org.apac
class ExpressionLanguageScope(Enum):
NONE = 1
- LIMITED = 2
+ ENVIRONMENT = 2
FLOWFILE_ATTRIBUTES = 3
@@ -133,8 +133,9 @@ class PropertyDescriptor:
:param expression_language_scope: documents the scope in which
Expression Language is valid. This value must be specified as one of the enum
values
in
`nifiapi.properties.ExpressionLanguageScope`. A value of `NONE` indicates that
Expression Language will not be evaluated for this property.
This is the default. A value of
`FLOWFILE_ATTRIBUTES` indicates that FlowFile attributes may be referenced when
configuring the property value.
- A value of `LIMITED` indicates that
Expression Language may be used but may not reference FlowFile attributes. For
example, a value of `${now()}` might
- be used to reference the current
date and time, or `${hostname(true)}` might be used to specify the hostname.
+ A value of `ENVIRONMENT` indicates
that Expression Language may be used and may reference environment variables
but may not reference FlowFile attributes.
+ For example, a value of `${now()}`
might be used to reference the current date and time, or `${hostname(true)}`
might be used to specify the hostname.
+ Or a value of `${ENV_VAR}` could be
used to reference an environment variable named `ENV_VAR`.
:param dynamic: whether or not this Property Descriptor represents a
dynamic (aka user-defined) property. This is not necessary to specify, as the
framework can determine this.
However, it is available if there is a desire to
explicitly set it for completeness' sake.
:param validators: A list of property validators that can be used to
ensure that the user-supplied value is valid. The standard validators can be
referenced using the
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/recordtransform.py
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/recordtransform.py
index 28036a1dbc..7685378f78 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/recordtransform.py
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/recordtransform.py
@@ -19,6 +19,10 @@ from nifiapi.properties import ProcessContext
from nifiapi.__jvm__ import JvmHolder
class RecordTransform(ABC):
+ # These will be set by the PythonProcessorAdapter when the component is
created
+ identifier = None
+ logger = None
+
def __init__(self):
self.arrayList = JvmHolder.jvm.java.util.ArrayList