[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392281#comment-16392281
 ] 

ASF GitHub Bot commented on NIFI-4946:
--------------------------------------

Github user Mageswaran1989 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r173350325
  
    --- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
    @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final 
ProcessSession session) thro
     
             String sessionId = livyController.get("sessionId");
             String livyUrl = livyController.get("livyUrl");
    -        String code = 
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
    -        if (StringUtils.isEmpty(code)) {
    -            try (InputStream inputStream = session.read(flowFile)) {
    -                // If no code was provided, assume it is in the content of 
the incoming flow file
    -                code = IOUtils.toString(inputStream, charset);
    -            } catch (IOException ioe) {
    -                log.error("Error reading input flowfile, penalizing and 
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    -                flowFile = session.penalize(flowFile);
    -                session.transfer(flowFile, REL_FAILURE);
    -                return;
    -            }
    -        }
     
    -        code = StringEscapeUtils.escapeJavaScript(code);
    -        String payload = "{\"code\":\"" + code + "\"}";
    +
             try {
    -            final JSONObject result = submitAndHandleJob(livyUrl, 
livySessionService, sessionId, payload, statusCheckInterval);
    -            log.debug("ExecuteSparkInteractive Result of Job Submit: " + 
result);
    -            if (result == null) {
    -                session.transfer(flowFile, REL_FAILURE);
    -            } else {
    +
    +            if (isBatchJob) {
    +
    +                String jsonResponse = null;
    +
    +                if (StringUtils.isEmpty(jsonResponse)) {
    +                    try (InputStream inputStream = session.read(flowFile)) 
{
    +                        // If no code was provided, assume it is in the 
content of the incoming flow file
    +                        jsonResponse = IOUtils.toString(inputStream, 
charset);
    +                    } catch (IOException ioe) {
    +                        log.error("Error reading input flowfile, 
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, 
ioe);
    +                        flowFile = session.penalize(flowFile);
    +                        session.transfer(flowFile, REL_FAILURE);
    +                        return;
    +                    }
    +                }
    +
    +                log.debug(" ====> jsonResponse: " + jsonResponse);
    +
                     try {
    -                    final JSONObject output = result.getJSONObject("data");
    -                    flowFile = session.write(flowFile, out -> 
out.write(output.toString().getBytes()));
    -                    flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    session.transfer(flowFile, REL_SUCCESS);
    -                } catch (JSONException je) {
    -                    // The result doesn't contain the data, just send the 
output object as the flow file content to failure (after penalizing)
    -                    log.error("Spark Session returned an error, sending 
the output JSON object as the flow file content to failure (after penalizing)");
    -                    flowFile = session.write(flowFile, out -> 
out.write(result.toString().getBytes()));
    +
    +                    final JSONObject jsonResponseObj = new 
JSONObject(jsonResponse);
    +
    +                    Map<String, String> headers = new HashMap<>();
    +                    headers.put("Content-Type", 
LivySessionService.APPLICATION_JSON);
    +                    headers.put("X-Requested-By", LivySessionService.USER);
    +                    headers.put("Accept", "application/json");
    +
    +                    JSONObject jobInfo = 
readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, 
headers);
    +
    +                    flowFile = session.write(flowFile, out -> 
out.write(jobInfo.toString().getBytes()));
                         flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    flowFile = session.penalize(flowFile);
    +
    +                    Thread.sleep(statusCheckInterval);
    +
    +                    String state  = jobInfo.getString("state");
    +                    log.debug(" ====> jsonResponseObj State: " + state);
    +
    +                    switch (state) {
    +                        case "success":
    +                            log.debug(" ====> success State: " + state);
    +                            session.transfer(flowFile, REL_SUCCESS);
    +                            break;
    +                        case "dead":
    +                            log.debug(" ====> dead State: " + state);
    +                            session.transfer(flowFile, REL_FAILURE);
    +                            break;
    +                        default:
    +                            log.debug(" ====> default State: " + state);
    +                            session.transfer(flowFile, REL_WAIT);
    +                            break;
    +                    }
    +
    +                } catch (JSONException | InterruptedException e) {
    +
    +                    //Incoming flow file is not an JSON file hence 
consider it to be an triggering point
    +
    +                    String batchPayload = "{ \"pyFiles\": [\"" 
+context.getProperty(PY_FILES).getValue()+ "\"], " +
    +                            "\"file\" : 
\""+context.getProperty(MAIN_PY_FILE).getValue()+"\" }";
    --- End diff --
    
    Could you please check the description @ 
https://issues.apache.org/jira/browse/NIFI-4946
    
    The assumption was made such that it doesn't break existing code flow and 
at the same time we wanted to know the status of the submitted job.
    
    So the naive idea was to re-route the Livy Json response back to Spark 
processor only, so that it can get last submitted `url` from the custom 
(tampered) JSON response, wait for user specified wait time and again query the 
Livy for the Job status in a loop till it succeeds or fails.
    
    So when the processor is configured to submit a Spark job, it will expect 
the incoming flowfile to be an custom Json response with an `url` field to 
query the Livy, if not it is considered as a triggering point nothing else.
    
    I am open for any ideas from your end.
    
    Thanks.


> nifi-spark-bundle : Adding support for pyfiles, file, jars options
> ------------------------------------------------------------------
>
>                 Key: NIFI-4946
>                 URL: https://issues.apache.org/jira/browse/NIFI-4946
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>    Affects Versions: 1.6.0
>         Environment: Ubuntu 16.04, IntelliJ
>            Reporter: Mageswaran
>            Priority: Major
>             Fix For: 1.6.0
>
>         Attachments: nifi-spark-options.png, nifi-spark.png
>
>
> Adding support for submitting PySpark based Sparks jobs (which is normally 
> structured as modules) over Livy on existing "ExecuteSparkInteractive" 
> processor.
> This is done by reading file paths for pyfiles and file and an option from 
> user whether the processor should trigger a batch job or not.
> [https://livy.incubator.apache.org/docs/latest/rest-api.html]
>  *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 
> )|https://github.com/apache/nifi/pull/2521]*
>  * Check whether the processor has to handle code or submit a Spark job
>  * Read incoming flow file
>  ** If batch == true
>  *** If flow file matches Livy `batches` JSON response through `wait` loop
>  **** Wait for Status Check Interval
>  **** Read the state
>  **** If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  *** Else
>  **** Ignore the flow file
>  **** Trigger the Spark job over Livy `batches` endpoint
>  **** Read the state of the submitted job
>  **** If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  ** Else:
>  *** Existing Logic to handle `Code`
>  
> !nifi-spark-options.png!
> !nifi-spark.png!
>  
> Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to