[
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16405038#comment-16405038
]
ASF GitHub Bot commented on NIFI-4946:
--------------------------------------
Github user Mageswaran1989 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2521#discussion_r175490223
--- Diff:
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
---
@@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final
ProcessSession session) thro
String sessionId = livyController.get("sessionId");
String livyUrl = livyController.get("livyUrl");
- String code =
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
- if (StringUtils.isEmpty(code)) {
- try (InputStream inputStream = session.read(flowFile)) {
- // If no code was provided, assume it is in the content of
the incoming flow file
- code = IOUtils.toString(inputStream, charset);
- } catch (IOException ioe) {
- log.error("Error reading input flowfile, penalizing and
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
- }
- code = StringEscapeUtils.escapeJavaScript(code);
- String payload = "{\"code\":\"" + code + "\"}";
+
try {
- final JSONObject result = submitAndHandleJob(livyUrl,
livySessionService, sessionId, payload, statusCheckInterval);
- log.debug("ExecuteSparkInteractive Result of Job Submit: " +
result);
- if (result == null) {
- session.transfer(flowFile, REL_FAILURE);
- } else {
+
+ if (isBatchJob) {
+
+ String jsonResponse = null;
+
+ if (StringUtils.isEmpty(jsonResponse)) {
+ try (InputStream inputStream = session.read(flowFile))
{
+ // If no code was provided, assume it is in the
content of the incoming flow file
+ jsonResponse = IOUtils.toString(inputStream,
charset);
+ } catch (IOException ioe) {
+ log.error("Error reading input flowfile,
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()},
ioe);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+
+ log.debug(" ====> jsonResponse: " + jsonResponse);
+
try {
- final JSONObject output = result.getJSONObject("data");
- flowFile = session.write(flowFile, out ->
out.write(output.toString().getBytes()));
- flowFile = session.putAttribute(flowFile,
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
- session.transfer(flowFile, REL_SUCCESS);
- } catch (JSONException je) {
- // The result doesn't contain the data, just send the
output object as the flow file content to failure (after penalizing)
- log.error("Spark Session returned an error, sending
the output JSON object as the flow file content to failure (after penalizing)");
- flowFile = session.write(flowFile, out ->
out.write(result.toString().getBytes()));
+
+ final JSONObject jsonResponseObj = new
JSONObject(jsonResponse);
+
+ Map<String, String> headers = new HashMap<>();
+ headers.put("Content-Type",
LivySessionService.APPLICATION_JSON);
+ headers.put("X-Requested-By", LivySessionService.USER);
+ headers.put("Accept", "application/json");
+
+ JSONObject jobInfo =
readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService,
headers);
+
+ flowFile = session.write(flowFile, out ->
out.write(jobInfo.toString().getBytes()));
flowFile = session.putAttribute(flowFile,
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
- flowFile = session.penalize(flowFile);
+
+ Thread.sleep(statusCheckInterval);
+
+ String state = jobInfo.getString("state");
+ log.debug(" ====> jsonResponseObj State: " + state);
+
+ switch (state) {
+ case "success":
+ log.debug(" ====> success State: " + state);
+ session.transfer(flowFile, REL_SUCCESS);
+ break;
+ case "dead":
+ log.debug(" ====> dead State: " + state);
+ session.transfer(flowFile, REL_FAILURE);
+ break;
+ default:
+ log.debug(" ====> default State: " + state);
+ session.transfer(flowFile, REL_WAIT);
+ break;
+ }
+
+ } catch (JSONException | InterruptedException e) {
+
+ //Incoming flow file is not an JSON file hence
consider it to be an triggering point
+
+ String batchPayload = "{ \"pyFiles\": [\""
+context.getProperty(PY_FILES).getValue()+ "\"], " +
+ "\"file\" :
\""+context.getProperty(MAIN_PY_FILE).getValue()+"\" }";
--- End diff --
As per the code flow @ https://issues.apache.org/jira/browse/NIFI-4946,
currently I am able to send *.zip files (Python modules) through livy. My
question was what should we do with flowfile, when we are using the processor
to submit a batch job?
> nifi-spark-bundle : Adding support for pyfiles, file, jars options
> ------------------------------------------------------------------
>
> Key: NIFI-4946
> URL: https://issues.apache.org/jira/browse/NIFI-4946
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Affects Versions: 1.6.0
> Environment: Ubuntu 16.04, IntelliJ
> Reporter: Mageswaran
> Priority: Major
> Attachments: nifi-spark-options.png, nifi-spark.png
>
>
> Adding support for submitting PySpark based Sparks jobs (which is normally
> structured as modules) over Livy on existing "ExecuteSparkInteractive"
> processor.
> This is done by reading file paths for pyfiles and file and an option from
> user whether the processor should trigger a batch job or not.
> [https://livy.incubator.apache.org/docs/latest/rest-api.html]
> *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521
> )|https://github.com/apache/nifi/pull/2521]*
> * Check whether the processor has to handle code or submit a Spark job
> * Read incoming flow file
> ** If batch == true
> *** If flow file matches Livy `batches` JSON response through `wait` loop
> **** Wait for Status Check Interval
> **** Read the state
> **** If state is `running` route it to `wait` or if it is `success` or
> `dead` route it accordingly
> *** Else
> **** Ignore the flow file
> **** Trigger the Spark job over Livy `batches` endpoint
> **** Read the state of the submitted job
> **** If state is `running` route it to `wait` or if it is `success` or
> `dead` route it accordingly
> ** Else:
> *** Existing Logic to handle `Code`
>
> !nifi-spark-options.png!
> !nifi-spark.png!
>
> Thanks.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)