Alexander Lex created NIFI-11861:
------------------------------------
Summary: Multi Thread Issue with Custom Processor
Key: NIFI-11861
URL: https://issues.apache.org/jira/browse/NIFI-11861
Project: Apache NiFi
Issue Type: Bug
Components: Core Framework
Affects Versions: 1.19.1
Reporter: Alexander Lex
We have a custom processor which pulls a flow file + string value from an
attribute ("uuid") from a session.
The processor writes out two flow files (part, header)
public static final PropertyDescriptor UUID = new PropertyDescriptor.Builder()
.name("uuid")
.displayName("UUID")
.description("UUID")
.defaultValue("${metadata.uuid}")
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
AttributeExpression.ResultType.STRING, true))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();
@Override
protected void init(final ProcessorInitializationContext context) {
this.descriptors = List.of(RECORD_WRITER, UUID);
this.relationships = Set.of(REL_PART, REL_FAILURE, REL_PARSE_FAILURE,
REL_HEADER);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null)
return;
// We intern the value to dereference it - make a local copy
String uuid =
context.getProperty(UUID).evaluateAttributeExpressions(flowFile).getValue().intern();
FlowFile partFlowFile = session.create(flowFile);
FlowFile headerFlowFile = session.create(flowFile);
Map<Relationship, FlowFile> relationshipToFlowFile = Map.of(
REL_PART, partFlowFile,
REL_HEADER, headerFlowFile);
OutputStream headerOutputstream = session.write(headerFlowFile);
OutputStream partOutputstream = session.write(partFlowFile);
Map<Relationship, OutputStream> relationshipToOutputStream = Map.of(
REL_PART, partOutputstream,
REL_HEADER, headerOutputstream);
(...)
The uuid is embedded in the flow file content
We are now facing the problem, when we run this processor on a multi node nifi
cluster with multiple threads, that the part and header flow files
occassionally have different uuids.
So we think that somehow the threads mix up the uuid from one incoming flowfile
to different outcoming flowfiles.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)