[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391551#comment-16391551
 ] 

ASF GitHub Bot commented on NIFI-4946:
--------------------------------------

Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r173223531
  
    --- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
    @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final 
ProcessSession session) thro
     
             String sessionId = livyController.get("sessionId");
             String livyUrl = livyController.get("livyUrl");
    -        String code = 
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
    -        if (StringUtils.isEmpty(code)) {
    -            try (InputStream inputStream = session.read(flowFile)) {
    -                // If no code was provided, assume it is in the content of 
the incoming flow file
    -                code = IOUtils.toString(inputStream, charset);
    -            } catch (IOException ioe) {
    -                log.error("Error reading input flowfile, penalizing and 
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    -                flowFile = session.penalize(flowFile);
    -                session.transfer(flowFile, REL_FAILURE);
    -                return;
    -            }
    -        }
     
    -        code = StringEscapeUtils.escapeJavaScript(code);
    -        String payload = "{\"code\":\"" + code + "\"}";
    +
             try {
    -            final JSONObject result = submitAndHandleJob(livyUrl, 
livySessionService, sessionId, payload, statusCheckInterval);
    -            log.debug("ExecuteSparkInteractive Result of Job Submit: " + 
result);
    -            if (result == null) {
    -                session.transfer(flowFile, REL_FAILURE);
    -            } else {
    +
    +            if (isBatchJob) {
    +
    +                String jsonResponse = null;
    +
    +                if (StringUtils.isEmpty(jsonResponse)) {
    +                    try (InputStream inputStream = session.read(flowFile)) 
{
    +                        // If no code was provided, assume it is in the 
content of the incoming flow file
    +                        jsonResponse = IOUtils.toString(inputStream, 
charset);
    +                    } catch (IOException ioe) {
    +                        log.error("Error reading input flowfile, 
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, 
ioe);
    +                        flowFile = session.penalize(flowFile);
    +                        session.transfer(flowFile, REL_FAILURE);
    +                        return;
    +                    }
    +                }
    +
    +                log.debug(" ====> jsonResponse: " + jsonResponse);
    +
                     try {
    -                    final JSONObject output = result.getJSONObject("data");
    -                    flowFile = session.write(flowFile, out -> 
out.write(output.toString().getBytes()));
    -                    flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    session.transfer(flowFile, REL_SUCCESS);
    -                } catch (JSONException je) {
    -                    // The result doesn't contain the data, just send the 
output object as the flow file content to failure (after penalizing)
    -                    log.error("Spark Session returned an error, sending 
the output JSON object as the flow file content to failure (after penalizing)");
    -                    flowFile = session.write(flowFile, out -> 
out.write(result.toString().getBytes()));
    +
    +                    final JSONObject jsonResponseObj = new 
JSONObject(jsonResponse);
    +
    +                    Map<String, String> headers = new HashMap<>();
    +                    headers.put("Content-Type", 
LivySessionService.APPLICATION_JSON);
    +                    headers.put("X-Requested-By", LivySessionService.USER);
    +                    headers.put("Accept", "application/json");
    +
    +                    JSONObject jobInfo = 
readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, 
headers);
    +
    +                    flowFile = session.write(flowFile, out -> 
out.write(jobInfo.toString().getBytes()));
                         flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    flowFile = session.penalize(flowFile);
    +
    +                    Thread.sleep(statusCheckInterval);
    +
    +                    String state  = jobInfo.getString("state");
    +                    log.debug(" ====> jsonResponseObj State: " + state);
    +
    +                    switch (state) {
    +                        case "success":
    +                            log.debug(" ====> success State: " + state);
    +                            session.transfer(flowFile, REL_SUCCESS);
    +                            break;
    +                        case "dead":
    +                            log.debug(" ====> dead State: " + state);
    +                            session.transfer(flowFile, REL_FAILURE);
    +                            break;
    +                        default:
    +                            log.debug(" ====> default State: " + state);
    +                            session.transfer(flowFile, REL_WAIT);
    +                            break;
    +                    }
    +
    +                } catch (JSONException | InterruptedException e) {
    +
    +                    //Incoming flow file is not an JSON file hence 
consider it to be an triggering point
    +
    +                    String batchPayload = "{ \"pyFiles\": [\"" 
+context.getProperty(PY_FILES).getValue()+ "\"], " +
    +                            "\"file\" : 
\""+context.getProperty(MAIN_PY_FILE).getValue()+"\" }";
    --- End diff --
    
    This is confusing to me. Why are we saying that if the incoming flowfile is 
not a valid JSON, we are going ahead with the assumption that it is going to be 
PySpark? I mean the assumption here lacks clarity. Please correct me, if I'm 
wrong.


> nifi-spark-bundle : Adding support for pyfiles, file, jars options
> ------------------------------------------------------------------
>
>                 Key: NIFI-4946
>                 URL: https://issues.apache.org/jira/browse/NIFI-4946
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>    Affects Versions: 1.6.0
>         Environment: Ubuntu 16.04, IntelliJ
>            Reporter: Mageswaran
>            Priority: Major
>             Fix For: 1.6.0
>
>         Attachments: nifi-spark-options.png, nifi-spark.png
>
>
> Adding support for submitting PySpark based Sparks jobs (which is normally 
> structured as modules) over Livy on existing "ExecuteSparkInteractive" 
> processor.
> This is done by reading file paths for pyfiles and file and an option from 
> user whether the processor should trigger a batch job or not.
> [https://livy.incubator.apache.org/docs/latest/rest-api.html]
>  *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 
> )|https://github.com/apache/nifi/pull/2521]*
>  * Check whether the processor has to handle code or submit a Spark job
>  * Read incoming flow file
>  ** If batch == true
>  *** If flow file matches Livy `batches` JSON response through `wait` loop
>  **** Wait for Status Check Interval
>  **** Read the state
>  **** If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  *** Else
>  **** Ignore the flow file
>  **** Trigger the Spark job over Livy `batches` endpoint
>  **** Read the state of the submitted job
>  **** If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  ** Else:
>  *** Existing Logic to handle `Code`
>  
> !nifi-spark-options.png!
> !nifi-spark.png!
>  
> Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to