YolandaMDavis commented on a change in pull request #3913: NIFI-6925: Fixed
JoltTransformRecord for RecordReaders, improved MockProcessSession
URL: https://github.com/apache/nifi/pull/3913#discussion_r353996386
##########
File path:
nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
##########
@@ -296,39 +297,37 @@ public void onTrigger(final ProcessContext context,
ProcessSession session) thro
final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordSchema schema;
+ FlowFile transformed = null;
+
try (final InputStream in = session.read(original);
final RecordReader reader =
readerFactory.createRecordReader(original, in, getLogger())) {
schema = writerFactory.getSchema(original.getAttributes(),
reader.getSchema());
- FlowFile transformed = session.create(original);
final Map<String, String> attributes = new HashMap<>();
final WriteResult writeResult;
+ transformed = session.create(original);
+
+ // We want to transform the first record before creating the
Record Writer. We do this because the Record will likely end up with a
different structure
+ // and therefore a difference Schema after being transformed. As a
result, we want to transform the Record and then provide the transformed schema
to the
+ // Record Writer so that if the Record Writer chooses to inherit
the Record Schema from the Record itself, it will inherit the transformed
schema, not the
+ // schema determined by the Record Reader.
+ final Record firstRecord = reader.nextRecord();
+ if (firstRecord == null) {
+ try (final OutputStream out = session.write(transformed);
+ final RecordSetWriter writer =
writerFactory.createWriter(getLogger(), schema, out, transformed)) {
- try {
- // We want to transform the first record before creating the
Record Writer. We do this because the Record will likely end up with a
different structure
- // and therefore a difference Schema after being transformed.
As a result, we want to transform the Record and then provide the transformed
schema to the
- // Record Writer so that if the Record Writer chooses to
inherit the Record Schema from the Record itself, it will inherit the
transformed schema, not the
- // schema determined by the Record Reader.
- final Record firstRecord = reader.nextRecord();
- if (firstRecord == null) {
- try (final OutputStream out = session.write(transformed);
- final RecordSetWriter writer =
writerFactory.createWriter(getLogger(), schema, out, transformed)) {
-
- writer.beginRecordSet();
- writeResult = writer.finishRecordSet();
-
- attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
- attributes.put(CoreAttributes.MIME_TYPE.key(),
writer.getMimeType());
- attributes.putAll(writeResult.getAttributes());
- }
+ writer.beginRecordSet();
+ writeResult = writer.finishRecordSet();
- transformed = session.putAllAttributes(transformed,
attributes);
- session.transfer(transformed, REL_SUCCESS);
- session.transfer(original, REL_ORIGINAL);
- logger.info("{} had no Records to transform", new
Object[]{original});
- return;
+ attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
writer.getMimeType());
+ attributes.putAll(writeResult.getAttributes());
}
+ transformed = session.putAllAttributes(transformed,
attributes);
+ logger.info("{} had no Records to transform", new
Object[]{original});
+ } else {
+
final JoltTransform transform = getTransform(context,
original);
final Record transformedFirstRecord = transform(firstRecord,
transform);
Review comment:
If the transform fails here transformedFirstRecord is returned and causes a
NullPointerException on the next line for writeSchema. Recommend adding a null
check.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services