Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r173213146
  
    --- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
    @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final 
ProcessSession session) thro
     
             String sessionId = livyController.get("sessionId");
             String livyUrl = livyController.get("livyUrl");
    -        String code = 
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
    -        if (StringUtils.isEmpty(code)) {
    -            try (InputStream inputStream = session.read(flowFile)) {
    -                // If no code was provided, assume it is in the content of 
the incoming flow file
    -                code = IOUtils.toString(inputStream, charset);
    -            } catch (IOException ioe) {
    -                log.error("Error reading input flowfile, penalizing and 
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    -                flowFile = session.penalize(flowFile);
    -                session.transfer(flowFile, REL_FAILURE);
    -                return;
    -            }
    -        }
     
    -        code = StringEscapeUtils.escapeJavaScript(code);
    -        String payload = "{\"code\":\"" + code + "\"}";
    +
             try {
    -            final JSONObject result = submitAndHandleJob(livyUrl, 
livySessionService, sessionId, payload, statusCheckInterval);
    -            log.debug("ExecuteSparkInteractive Result of Job Submit: " + 
result);
    -            if (result == null) {
    -                session.transfer(flowFile, REL_FAILURE);
    -            } else {
    +
    +            if (isBatchJob) {
    +
    +                String jsonResponse = null;
    +
    +                if (StringUtils.isEmpty(jsonResponse)) {
    +                    try (InputStream inputStream = session.read(flowFile)) 
{
    +                        // If no code was provided, assume it is in the 
content of the incoming flow file
    +                        jsonResponse = IOUtils.toString(inputStream, 
charset);
    +                    } catch (IOException ioe) {
    +                        log.error("Error reading input flowfile, 
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, 
ioe);
    +                        flowFile = session.penalize(flowFile);
    +                        session.transfer(flowFile, REL_FAILURE);
    +                        return;
    +                    }
    +                }
    +
    +                log.debug(" ====> jsonResponse: " + jsonResponse);
    --- End diff --
    
    Cosmetic change: It would be great if this log.debug message can be changed 
to something of a proper standard, like "JSON Response : <the JSON response>" 
i.e. Remove ====> 


---

Reply via email to