YolandaMDavis commented on a change in pull request #3913: NIFI-6925: Fixed 
JoltTransformRecord for RecordReaders, improved MockProcessSession
URL: https://github.com/apache/nifi/pull/3913#discussion_r353996386
 
 

 ##########
 File path: 
nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
 ##########
 @@ -296,39 +297,37 @@ public void onTrigger(final ProcessContext context, 
ProcessSession session) thro
         final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
         final RecordSchema schema;
+        FlowFile transformed = null;
+
         try (final InputStream in = session.read(original);
              final RecordReader reader = 
readerFactory.createRecordReader(original, in, getLogger())) {
             schema = writerFactory.getSchema(original.getAttributes(), 
reader.getSchema());
 
-            FlowFile transformed = session.create(original);
             final Map<String, String> attributes = new HashMap<>();
             final WriteResult writeResult;
+            transformed = session.create(original);
+
+            // We want to transform the first record before creating the 
Record Writer. We do this because the Record will likely end up with a 
different structure
+            // and therefore a difference Schema after being transformed. As a 
result, we want to transform the Record and then provide the transformed schema 
to the
+            // Record Writer so that if the Record Writer chooses to inherit 
the Record Schema from the Record itself, it will inherit the transformed 
schema, not the
+            // schema determined by the Record Reader.
+            final Record firstRecord = reader.nextRecord();
+            if (firstRecord == null) {
+                try (final OutputStream out = session.write(transformed);
+                     final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), schema, out, transformed)) {
 
-            try {
-                // We want to transform the first record before creating the 
Record Writer. We do this because the Record will likely end up with a 
different structure
-                // and therefore a difference Schema after being transformed. 
As a result, we want to transform the Record and then provide the transformed 
schema to the
-                // Record Writer so that if the Record Writer chooses to 
inherit the Record Schema from the Record itself, it will inherit the 
transformed schema, not the
-                // schema determined by the Record Reader.
-                final Record firstRecord = reader.nextRecord();
-                if (firstRecord == null) {
-                    try (final OutputStream out = session.write(transformed);
-                         final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), schema, out, transformed)) {
-
-                        writer.beginRecordSet();
-                        writeResult = writer.finishRecordSet();
-
-                        attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
-                        attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
-                        attributes.putAll(writeResult.getAttributes());
-                    }
+                    writer.beginRecordSet();
+                    writeResult = writer.finishRecordSet();
 
-                    transformed = session.putAllAttributes(transformed, 
attributes);
-                    session.transfer(transformed, REL_SUCCESS);
-                    session.transfer(original, REL_ORIGINAL);
-                    logger.info("{} had no Records to transform", new 
Object[]{original});
-                    return;
+                    attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
+                    attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
+                    attributes.putAll(writeResult.getAttributes());
                 }
 
+                transformed = session.putAllAttributes(transformed, 
attributes);
+                logger.info("{} had no Records to transform", new 
Object[]{original});
+            } else {
+
                 final JoltTransform transform = getTransform(context, 
original);
                 final Record transformedFirstRecord = transform(firstRecord, 
transform);
 
 Review comment:
   If the transform fails here transformedFirstRecord is returned and causes a 
NullPointerException on the next line for writeSchema.  Recommend adding a null 
check.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to