YolandaMDavis commented on a change in pull request #3913: NIFI-6925: Fixed 
JoltTransformRecord for RecordReaders, improved MockProcessSession
URL: https://github.com/apache/nifi/pull/3913#discussion_r353988951
 
 

 ##########
 File path: 
nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
 ##########
 @@ -353,27 +352,32 @@ public void onTrigger(final ProcessContext context, 
ProcessSession session) thro
 
                     writeResult = writer.finishRecordSet();
 
+                    try {
+                        writer.close();
+                    } catch (final IOException ioe) {
+                        getLogger().warn("Failed to close Writer for {}", new 
Object[]{transformed});
+                    }
+
                     attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
                     attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
                     attributes.putAll(writeResult.getAttributes());
                 }
-            } catch (Exception e) {
-                logger.error("Unable to write transformed records {} due to 
{}", new Object[]{original, e.toString(), e});
-                session.remove(transformed);
-                session.transfer(original, REL_FAILURE);
-                return;
-            }
 
-            final String transformType = 
context.getProperty(JOLT_TRANSFORM).getValue();
-            transformed = session.putAllAttributes(transformed, attributes);
-            session.transfer(transformed, REL_SUCCESS);
-            session.getProvenanceReporter().modifyContent(transformed, 
"Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-            session.transfer(original, REL_ORIGINAL);
-            logger.debug("Transformed {}", new Object[]{original});
+                final String transformType = 
context.getProperty(JOLT_TRANSFORM).getValue();
+                transformed = session.putAllAttributes(transformed, 
attributes);
+                session.getProvenanceReporter().modifyContent(transformed, 
"Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+                logger.debug("Transformed {}", new Object[]{original});
+            }
         } catch (final Exception ex) {
             logger.error("Unable to transform {} due to {}", new 
Object[]{original, ex.toString(), ex});
             session.transfer(original, REL_FAILURE);
+            session.remove(transformed);
 
 Review comment:
   Recommend adding a null check here.  In testing I saw a case where if an 
exception is encountered the transformed object is null at this point.  When 
session.remove on the object  it hits a NullPointer exception.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to