[
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392281#comment-16392281
]
ASF GitHub Bot commented on NIFI-4946:
--------------------------------------
Github user Mageswaran1989 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2521#discussion_r173350325
--- Diff:
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
---
@@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final
ProcessSession session) thro
String sessionId = livyController.get("sessionId");
String livyUrl = livyController.get("livyUrl");
- String code =
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
- if (StringUtils.isEmpty(code)) {
- try (InputStream inputStream = session.read(flowFile)) {
- // If no code was provided, assume it is in the content of
the incoming flow file
- code = IOUtils.toString(inputStream, charset);
- } catch (IOException ioe) {
- log.error("Error reading input flowfile, penalizing and
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
- }
- code = StringEscapeUtils.escapeJavaScript(code);
- String payload = "{\"code\":\"" + code + "\"}";
+
try {
- final JSONObject result = submitAndHandleJob(livyUrl,
livySessionService, sessionId, payload, statusCheckInterval);
- log.debug("ExecuteSparkInteractive Result of Job Submit: " +
result);
- if (result == null) {
- session.transfer(flowFile, REL_FAILURE);
- } else {
+
+ if (isBatchJob) {
+
+ String jsonResponse = null;
+
+ if (StringUtils.isEmpty(jsonResponse)) {
+ try (InputStream inputStream = session.read(flowFile))
{
+ // If no code was provided, assume it is in the
content of the incoming flow file
+ jsonResponse = IOUtils.toString(inputStream,
charset);
+ } catch (IOException ioe) {
+ log.error("Error reading input flowfile,
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()},
ioe);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+
+ log.debug(" ====> jsonResponse: " + jsonResponse);
+
try {
- final JSONObject output = result.getJSONObject("data");
- flowFile = session.write(flowFile, out ->
out.write(output.toString().getBytes()));
- flowFile = session.putAttribute(flowFile,
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
- session.transfer(flowFile, REL_SUCCESS);
- } catch (JSONException je) {
- // The result doesn't contain the data, just send the
output object as the flow file content to failure (after penalizing)
- log.error("Spark Session returned an error, sending
the output JSON object as the flow file content to failure (after penalizing)");
- flowFile = session.write(flowFile, out ->
out.write(result.toString().getBytes()));
+
+ final JSONObject jsonResponseObj = new
JSONObject(jsonResponse);
+
+ Map<String, String> headers = new HashMap<>();
+ headers.put("Content-Type",
LivySessionService.APPLICATION_JSON);
+ headers.put("X-Requested-By", LivySessionService.USER);
+ headers.put("Accept", "application/json");
+
+ JSONObject jobInfo =
readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService,
headers);
+
+ flowFile = session.write(flowFile, out ->
out.write(jobInfo.toString().getBytes()));
flowFile = session.putAttribute(flowFile,
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
- flowFile = session.penalize(flowFile);
+
+ Thread.sleep(statusCheckInterval);
+
+ String state = jobInfo.getString("state");
+ log.debug(" ====> jsonResponseObj State: " + state);
+
+ switch (state) {
+ case "success":
+ log.debug(" ====> success State: " + state);
+ session.transfer(flowFile, REL_SUCCESS);
+ break;
+ case "dead":
+ log.debug(" ====> dead State: " + state);
+ session.transfer(flowFile, REL_FAILURE);
+ break;
+ default:
+ log.debug(" ====> default State: " + state);
+ session.transfer(flowFile, REL_WAIT);
+ break;
+ }
+
+ } catch (JSONException | InterruptedException e) {
+
+ //Incoming flow file is not an JSON file hence
consider it to be an triggering point
+
+ String batchPayload = "{ \"pyFiles\": [\""
+context.getProperty(PY_FILES).getValue()+ "\"], " +
+ "\"file\" :
\""+context.getProperty(MAIN_PY_FILE).getValue()+"\" }";
--- End diff --
Could you please check the description @
https://issues.apache.org/jira/browse/NIFI-4946
The assumption was made such that it doesn't break existing code flow and
at the same time we wanted to know the status of the submitted job.
So the naive idea was to re-route the Livy Json response back to Spark
processor only, so that it can get last submitted `url` from the custom
(tampered) JSON response, wait for user specified wait time and again query the
Livy for the Job status in a loop till it succeeds or fails.
So when the processor is configured to submit a Spark job, it will expect
the incoming flowfile to be an custom Json response with an `url` field to
query the Livy, if not it is considered as a triggering point nothing else.
I am open for any ideas from your end.
Thanks.
> nifi-spark-bundle : Adding support for pyfiles, file, jars options
> ------------------------------------------------------------------
>
> Key: NIFI-4946
> URL: https://issues.apache.org/jira/browse/NIFI-4946
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Affects Versions: 1.6.0
> Environment: Ubuntu 16.04, IntelliJ
> Reporter: Mageswaran
> Priority: Major
> Fix For: 1.6.0
>
> Attachments: nifi-spark-options.png, nifi-spark.png
>
>
> Adding support for submitting PySpark based Sparks jobs (which is normally
> structured as modules) over Livy on existing "ExecuteSparkInteractive"
> processor.
> This is done by reading file paths for pyfiles and file and an option from
> user whether the processor should trigger a batch job or not.
> [https://livy.incubator.apache.org/docs/latest/rest-api.html]
> *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521
> )|https://github.com/apache/nifi/pull/2521]*
> * Check whether the processor has to handle code or submit a Spark job
> * Read incoming flow file
> ** If batch == true
> *** If flow file matches Livy `batches` JSON response through `wait` loop
> **** Wait for Status Check Interval
> **** Read the state
> **** If state is `running` route it to `wait` or if it is `success` or
> `dead` route it accordingly
> *** Else
> **** Ignore the flow file
> **** Trigger the Spark job over Livy `batches` endpoint
> **** Read the state of the submitted job
> **** If state is `running` route it to `wait` or if it is `success` or
> `dead` route it accordingly
> ** Else:
> *** Existing Logic to handle `Code`
>
> !nifi-spark-options.png!
> !nifi-spark.png!
>
> Thanks.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)