Github user Mageswaran1989 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2521#discussion_r173350325
--- Diff:
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
---
@@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final
ProcessSession session) thro
String sessionId = livyController.get("sessionId");
String livyUrl = livyController.get("livyUrl");
- String code =
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
- if (StringUtils.isEmpty(code)) {
- try (InputStream inputStream = session.read(flowFile)) {
- // If no code was provided, assume it is in the content of
the incoming flow file
- code = IOUtils.toString(inputStream, charset);
- } catch (IOException ioe) {
- log.error("Error reading input flowfile, penalizing and
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
- }
- code = StringEscapeUtils.escapeJavaScript(code);
- String payload = "{\"code\":\"" + code + "\"}";
+
try {
- final JSONObject result = submitAndHandleJob(livyUrl,
livySessionService, sessionId, payload, statusCheckInterval);
- log.debug("ExecuteSparkInteractive Result of Job Submit: " +
result);
- if (result == null) {
- session.transfer(flowFile, REL_FAILURE);
- } else {
+
+ if (isBatchJob) {
+
+ String jsonResponse = null;
+
+ if (StringUtils.isEmpty(jsonResponse)) {
+ try (InputStream inputStream = session.read(flowFile))
{
+ // If no code was provided, assume it is in the
content of the incoming flow file
+ jsonResponse = IOUtils.toString(inputStream,
charset);
+ } catch (IOException ioe) {
+ log.error("Error reading input flowfile,
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()},
ioe);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+
+ log.debug(" ====> jsonResponse: " + jsonResponse);
+
try {
- final JSONObject output = result.getJSONObject("data");
- flowFile = session.write(flowFile, out ->
out.write(output.toString().getBytes()));
- flowFile = session.putAttribute(flowFile,
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
- session.transfer(flowFile, REL_SUCCESS);
- } catch (JSONException je) {
- // The result doesn't contain the data, just send the
output object as the flow file content to failure (after penalizing)
- log.error("Spark Session returned an error, sending
the output JSON object as the flow file content to failure (after penalizing)");
- flowFile = session.write(flowFile, out ->
out.write(result.toString().getBytes()));
+
+ final JSONObject jsonResponseObj = new
JSONObject(jsonResponse);
+
+ Map<String, String> headers = new HashMap<>();
+ headers.put("Content-Type",
LivySessionService.APPLICATION_JSON);
+ headers.put("X-Requested-By", LivySessionService.USER);
+ headers.put("Accept", "application/json");
+
+ JSONObject jobInfo =
readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService,
headers);
+
+ flowFile = session.write(flowFile, out ->
out.write(jobInfo.toString().getBytes()));
flowFile = session.putAttribute(flowFile,
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
- flowFile = session.penalize(flowFile);
+
+ Thread.sleep(statusCheckInterval);
+
+ String state = jobInfo.getString("state");
+ log.debug(" ====> jsonResponseObj State: " + state);
+
+ switch (state) {
+ case "success":
+ log.debug(" ====> success State: " + state);
+ session.transfer(flowFile, REL_SUCCESS);
+ break;
+ case "dead":
+ log.debug(" ====> dead State: " + state);
+ session.transfer(flowFile, REL_FAILURE);
+ break;
+ default:
+ log.debug(" ====> default State: " + state);
+ session.transfer(flowFile, REL_WAIT);
+ break;
+ }
+
+ } catch (JSONException | InterruptedException e) {
+
+ //Incoming flow file is not an JSON file hence
consider it to be an triggering point
+
+ String batchPayload = "{ \"pyFiles\": [\""
+context.getProperty(PY_FILES).getValue()+ "\"], " +
+ "\"file\" :
\""+context.getProperty(MAIN_PY_FILE).getValue()+"\" }";
--- End diff --
Could you please check the description @
https://issues.apache.org/jira/browse/NIFI-4946
The assumption was made such that it doesn't break existing code flow and
at the same time we wanted to know the status of the submitted job.
So the naive idea was to re-route the Livy Json response back to Spark
processor only, so that it can get last submitted `url` from the custom
(tampered) JSON response, wait for user specified wait time and again query the
Livy for the Job status in a loop till it succeeds or fails.
So when the processor is configured to submit a Spark job, it will expect
the incoming flowfile to be an custom Json response with an `url` field to
query the Livy, if not it is considered as a triggering point nothing else.
I am open for any ideas from your end.
Thanks.
---