[
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16405074#comment-16405074
]
ASF GitHub Bot commented on NIFI-4946:
--------------------------------------
Github user Mageswaran1989 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2521#discussion_r175496952
--- Diff:
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
---
@@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final
ProcessSession session) thro
String sessionId = livyController.get("sessionId");
String livyUrl = livyController.get("livyUrl");
- String code =
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
- if (StringUtils.isEmpty(code)) {
- try (InputStream inputStream = session.read(flowFile)) {
- // If no code was provided, assume it is in the content of
the incoming flow file
- code = IOUtils.toString(inputStream, charset);
- } catch (IOException ioe) {
- log.error("Error reading input flowfile, penalizing and
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
- }
- code = StringEscapeUtils.escapeJavaScript(code);
- String payload = "{\"code\":\"" + code + "\"}";
+
try {
- final JSONObject result = submitAndHandleJob(livyUrl,
livySessionService, sessionId, payload, statusCheckInterval);
- log.debug("ExecuteSparkInteractive Result of Job Submit: " +
result);
- if (result == null) {
- session.transfer(flowFile, REL_FAILURE);
- } else {
+
+ if (isBatchJob) {
+
+ String jsonResponse = null;
+
+ if (StringUtils.isEmpty(jsonResponse)) {
+ try (InputStream inputStream = session.read(flowFile))
{
+ // If no code was provided, assume it is in the
content of the incoming flow file
+ jsonResponse = IOUtils.toString(inputStream,
charset);
+ } catch (IOException ioe) {
+ log.error("Error reading input flowfile,
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()},
ioe);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+
+ log.debug(" ====> jsonResponse: " + jsonResponse);
+
try {
- final JSONObject output = result.getJSONObject("data");
- flowFile = session.write(flowFile, out ->
out.write(output.toString().getBytes()));
- flowFile = session.putAttribute(flowFile,
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
- session.transfer(flowFile, REL_SUCCESS);
- } catch (JSONException je) {
- // The result doesn't contain the data, just send the
output object as the flow file content to failure (after penalizing)
- log.error("Spark Session returned an error, sending
the output JSON object as the flow file content to failure (after penalizing)");
- flowFile = session.write(flowFile, out ->
out.write(result.toString().getBytes()));
+
+ final JSONObject jsonResponseObj = new
JSONObject(jsonResponse);
+
+ Map<String, String> headers = new HashMap<>();
+ headers.put("Content-Type",
LivySessionService.APPLICATION_JSON);
+ headers.put("X-Requested-By", LivySessionService.USER);
+ headers.put("Accept", "application/json");
+
+ JSONObject jobInfo =
readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService,
headers);
+
+ flowFile = session.write(flowFile, out ->
out.write(jobInfo.toString().getBytes()));
flowFile = session.putAttribute(flowFile,
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
- flowFile = session.penalize(flowFile);
+
+ Thread.sleep(statusCheckInterval);
+
+ String state = jobInfo.getString("state");
+ log.debug(" ====> jsonResponseObj State: " + state);
+
+ switch (state) {
+ case "success":
+ log.debug(" ====> success State: " + state);
+ session.transfer(flowFile, REL_SUCCESS);
+ break;
+ case "dead":
+ log.debug(" ====> dead State: " + state);
+ session.transfer(flowFile, REL_FAILURE);
+ break;
+ default:
+ log.debug(" ====> default State: " + state);
+ session.transfer(flowFile, REL_WAIT);
+ break;
+ }
+
+ } catch (JSONException | InterruptedException e) {
+
+ //Incoming flow file is not an JSON file hence
consider it to be an triggering point
+
+ String batchPayload = "{ \"pyFiles\": [\""
+context.getProperty(PY_FILES).getValue()+ "\"], " +
+ "\"file\" :
\""+context.getProperty(MAIN_PY_FILE).getValue()+"\" }";
--- End diff --
Sometime this week I am planning to add support for jar files, args and
application name over the Livy options.
The catch here is unlike plain Spark code, batch process code will take
time to finish which is expected one as we know. So as a hack I was re-routing
the Json response after batch submission to itself, where I poll the incoming
flowfile and check whether it is a Json file and if so I will try to get the
"livy url" to post again to know the status of the batch job as long as it
runs. After knowing the the job finished, the success route is triggered.
That was the reason the I have made an assumption if the incoming file is
Json, it is from previous batch job submission.
In short, the flow file :
- Is considered as a triggering point (or)
- Is considered as plain Spark code that compiles over Livy (or)
- Is a Livy Json response, which can further be used to check the status of
long running Spark batch job
I was looking for the right Nifi way of handling this?
I feel I am too conservative and trying to fit all the functionalities on
one processor.
- Flow file/property can be used to run a Spark code
- Pyfiles can be used to run Spark batch job
- Jars can be used to run Spark batch job
- Args options for batch mode
- By rerouting the success to itself, it can monitor the long running job
over Livy rest APIs
> nifi-spark-bundle : Adding support for pyfiles, file, jars options
> ------------------------------------------------------------------
>
> Key: NIFI-4946
> URL: https://issues.apache.org/jira/browse/NIFI-4946
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Affects Versions: 1.6.0
> Environment: Ubuntu 16.04, IntelliJ
> Reporter: Mageswaran
> Priority: Major
> Attachments: nifi-spark-options.png, nifi-spark.png
>
>
> Adding support for submitting PySpark based Sparks jobs (which is normally
> structured as modules) over Livy on existing "ExecuteSparkInteractive"
> processor.
> This is done by reading file paths for pyfiles and file and an option from
> user whether the processor should trigger a batch job or not.
> [https://livy.incubator.apache.org/docs/latest/rest-api.html]
> *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521
> )|https://github.com/apache/nifi/pull/2521]*
> * Check whether the processor has to handle code or submit a Spark job
> * Read incoming flow file
> ** If batch == true
> *** If flow file matches Livy `batches` JSON response through `wait` loop
> **** Wait for Status Check Interval
> **** Read the state
> **** If state is `running` route it to `wait` or if it is `success` or
> `dead` route it accordingly
> *** Else
> **** Ignore the flow file
> **** Trigger the Spark job over Livy `batches` endpoint
> **** Read the state of the submitted job
> **** If state is `running` route it to `wait` or if it is `success` or
> `dead` route it accordingly
> ** Else:
> *** Existing Logic to handle `Code`
>
> !nifi-spark-options.png!
> !nifi-spark.png!
>
> Thanks.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)