pvillard31 opened a new pull request, #10387:
URL: https://github.com/apache/nifi/pull/10387
# Summary
NIFI-13402 - Multiple flowfiles as output for Python Processors
### Context
The current Python bridge allows each invocation of a Python Processor to
return exactly one `FlowFileSourceResult` or `FlowFileTransformResult`. This
works for simple use-cases but prevents Python processors from emitting
multiple FlowFiles in a single trigger, which can be common for some use cases
(pagination, splitting, etc).
The taken approach in this PR is to have the minimal changes needed across
the Java bridge and the Python helper library to support multiple results while
preserving backward compatibility.
### Goals
- Allow Python source processors to emit zero, one, or many FlowFiles per
invocation without additional Java code
- Allow Python transform processors to emit zero, one, or many FlowFiles
derived from the incoming FlowFile in a single `onTrigger` call
- Preserve existing processor behavior and API surface for implementations
that continue to return a single result object
- Provide predictable handling of the original FlowFile when multiple
results are returned
### Non-Goals
- Supporting Java-side streaming of arbitrarily large result sequences
(iterables will be materialised into memory per call)
- Changing RecordTransform behavior
### Python Return Contract
Processors may return either:
- a single `FlowFile*Result` instance (current behaviour), or
- any iterable (list, tuple, generator) containing `FlowFile*Result`
instances
Returning `None` continues to mean "no output".
Python helper classes (`FlowFileTransform` and `FlowFileSource`) will
normalise generators into lists before returning to Py4J to ensure predictable
conversion.
### FlowFileSourceProxy Changes
- Detect when the bridge returns a `Collection` or `Iterator` and iterate
over each element, creating and transferring a FlowFile per result
- Maintain the current fast-path for single results to avoid overhead
- Continue to honour the implicit `success` relationship while allowing
per-result relationship names
- `FlowFileSourceResult.free()` is invoked for every element to avoid
leaking proxies
### FlowFileTransformProxy Changes
- Fan-out the incoming FlowFile when multiple results are returned by the
Python side
- Default behaviour when more than one result is provided:
- clone the original FlowFile once per result
- apply the same attribute/content updates that are currently performed
- transfer each clone to its requested relationship
- Single-result behaviour remains unchanged: mutate the original FlowFile
in-place and transfer to the requested relationship, cloning only if the
`original` relationship is expected (matching existing logic)
- The original FlowFile will be routed to `REL_ORIGINAL` when that
relationship is not auto-terminated and at least one result was processed. If
the relationship is auto-terminated we remove the clone as currently implemented
- Any result that requests the `failure` relationship causes the original
FlowFile to be routed to failure once, mirroring today's semantics. Additional
`failure` results on the same trigger is ignored with a warning to avoid
double-transfer attempts
- Invalid result types (for example, returning plain strings) will trigger a
`ProcessException` with a detailed message
### Backward Compatibility
- Existing processors that return a single `FlowFile*Result` continue to
work without modification
- Processors that currently return a single result wrapped in a list will
behave the same after this change
- The default cloning behaviour for multi-result transforms mirrors NiFi’s
Java processors like `SplitText`
### Error Handling
- Null or empty iterables result in no FlowFiles being transferred
- Mixed results where some are `None` will be skipped with a warning and
without failing the entire batch
- Any exception raised while handling a specific result (for example, write
failure) will roll back the session and rethrow a `ProcessException`
# Tracking
Please complete the following tracking steps prior to pull request creation.
### Issue Tracking
- [ ] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue
created
### Pull Request Tracking
- [ ] Pull Request title starts with Apache NiFi Jira issue number, such as
`NIFI-00000`
- [ ] Pull Request commit message starts with Apache NiFi Jira issue number,
as such `NIFI-00000`
### Pull Request Formatting
- [ ] Pull Request based on current revision of the `main` branch
- [ ] Pull Request refers to a feature branch with one commit containing
changes
# Verification
Please indicate the verification steps performed prior to pull request
creation.
### Build
- [ ] Build completed using `./mvnw clean install -P contrib-check`
- [ ] JDK 21
- [ ] JDK 25
### Licensing
- [ ] New dependencies are compatible with the [Apache License
2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License
Policy](https://www.apache.org/legal/resolved.html)
- [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE`
files
### Documentation
- [ ] Documentation formatting appears as expected in rendered files
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]