Github user Mageswaran1989 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r173350325
  
    --- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
    @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final 
ProcessSession session) thro
     
             String sessionId = livyController.get("sessionId");
             String livyUrl = livyController.get("livyUrl");
    -        String code = 
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
    -        if (StringUtils.isEmpty(code)) {
    -            try (InputStream inputStream = session.read(flowFile)) {
    -                // If no code was provided, assume it is in the content of 
the incoming flow file
    -                code = IOUtils.toString(inputStream, charset);
    -            } catch (IOException ioe) {
    -                log.error("Error reading input flowfile, penalizing and 
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    -                flowFile = session.penalize(flowFile);
    -                session.transfer(flowFile, REL_FAILURE);
    -                return;
    -            }
    -        }
     
    -        code = StringEscapeUtils.escapeJavaScript(code);
    -        String payload = "{\"code\":\"" + code + "\"}";
    +
             try {
    -            final JSONObject result = submitAndHandleJob(livyUrl, 
livySessionService, sessionId, payload, statusCheckInterval);
    -            log.debug("ExecuteSparkInteractive Result of Job Submit: " + 
result);
    -            if (result == null) {
    -                session.transfer(flowFile, REL_FAILURE);
    -            } else {
    +
    +            if (isBatchJob) {
    +
    +                String jsonResponse = null;
    +
    +                if (StringUtils.isEmpty(jsonResponse)) {
    +                    try (InputStream inputStream = session.read(flowFile)) 
{
    +                        // If no code was provided, assume it is in the 
content of the incoming flow file
    +                        jsonResponse = IOUtils.toString(inputStream, 
charset);
    +                    } catch (IOException ioe) {
    +                        log.error("Error reading input flowfile, 
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, 
ioe);
    +                        flowFile = session.penalize(flowFile);
    +                        session.transfer(flowFile, REL_FAILURE);
    +                        return;
    +                    }
    +                }
    +
    +                log.debug(" ====> jsonResponse: " + jsonResponse);
    +
                     try {
    -                    final JSONObject output = result.getJSONObject("data");
    -                    flowFile = session.write(flowFile, out -> 
out.write(output.toString().getBytes()));
    -                    flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    session.transfer(flowFile, REL_SUCCESS);
    -                } catch (JSONException je) {
    -                    // The result doesn't contain the data, just send the 
output object as the flow file content to failure (after penalizing)
    -                    log.error("Spark Session returned an error, sending 
the output JSON object as the flow file content to failure (after penalizing)");
    -                    flowFile = session.write(flowFile, out -> 
out.write(result.toString().getBytes()));
    +
    +                    final JSONObject jsonResponseObj = new 
JSONObject(jsonResponse);
    +
    +                    Map<String, String> headers = new HashMap<>();
    +                    headers.put("Content-Type", 
LivySessionService.APPLICATION_JSON);
    +                    headers.put("X-Requested-By", LivySessionService.USER);
    +                    headers.put("Accept", "application/json");
    +
    +                    JSONObject jobInfo = 
readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, 
headers);
    +
    +                    flowFile = session.write(flowFile, out -> 
out.write(jobInfo.toString().getBytes()));
                         flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    flowFile = session.penalize(flowFile);
    +
    +                    Thread.sleep(statusCheckInterval);
    +
    +                    String state  = jobInfo.getString("state");
    +                    log.debug(" ====> jsonResponseObj State: " + state);
    +
    +                    switch (state) {
    +                        case "success":
    +                            log.debug(" ====> success State: " + state);
    +                            session.transfer(flowFile, REL_SUCCESS);
    +                            break;
    +                        case "dead":
    +                            log.debug(" ====> dead State: " + state);
    +                            session.transfer(flowFile, REL_FAILURE);
    +                            break;
    +                        default:
    +                            log.debug(" ====> default State: " + state);
    +                            session.transfer(flowFile, REL_WAIT);
    +                            break;
    +                    }
    +
    +                } catch (JSONException | InterruptedException e) {
    +
    +                    //Incoming flow file is not an JSON file hence 
consider it to be an triggering point
    +
    +                    String batchPayload = "{ \"pyFiles\": [\"" 
+context.getProperty(PY_FILES).getValue()+ "\"], " +
    +                            "\"file\" : 
\""+context.getProperty(MAIN_PY_FILE).getValue()+"\" }";
    --- End diff --
    
    Could you please check the description @ 
https://issues.apache.org/jira/browse/NIFI-4946
    
    The assumption was made such that it doesn't break existing code flow and 
at the same time we wanted to know the status of the submitted job.
    
    So the naive idea was to re-route the Livy Json response back to Spark 
processor only, so that it can get last submitted `url` from the custom 
(tampered) JSON response, wait for user specified wait time and again query the 
Livy for the Job status in a loop till it succeeds or fails.
    
    So when the processor is configured to submit a Spark job, it will expect 
the incoming flowfile to be an custom Json response with an `url` field to 
query the Livy, if not it is considered as a triggering point nothing else.
    
    I am open for any ideas from your end.
    
    Thanks.


---

Reply via email to