Github user Mageswaran1989 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2521#discussion_r175496952
--- Diff:
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
---
@@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final
ProcessSession session) thro
String sessionId = livyController.get("sessionId");
String livyUrl = livyController.get("livyUrl");
- String code =
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
- if (StringUtils.isEmpty(code)) {
- try (InputStream inputStream = session.read(flowFile)) {
- // If no code was provided, assume it is in the content of
the incoming flow file
- code = IOUtils.toString(inputStream, charset);
- } catch (IOException ioe) {
- log.error("Error reading input flowfile, penalizing and
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
- }
- code = StringEscapeUtils.escapeJavaScript(code);
- String payload = "{\"code\":\"" + code + "\"}";
+
try {
- final JSONObject result = submitAndHandleJob(livyUrl,
livySessionService, sessionId, payload, statusCheckInterval);
- log.debug("ExecuteSparkInteractive Result of Job Submit: " +
result);
- if (result == null) {
- session.transfer(flowFile, REL_FAILURE);
- } else {
+
+ if (isBatchJob) {
+
+ String jsonResponse = null;
+
+ if (StringUtils.isEmpty(jsonResponse)) {
+ try (InputStream inputStream = session.read(flowFile))
{
+ // If no code was provided, assume it is in the
content of the incoming flow file
+ jsonResponse = IOUtils.toString(inputStream,
charset);
+ } catch (IOException ioe) {
+ log.error("Error reading input flowfile,
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()},
ioe);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+
+ log.debug(" ====> jsonResponse: " + jsonResponse);
+
try {
- final JSONObject output = result.getJSONObject("data");
- flowFile = session.write(flowFile, out ->
out.write(output.toString().getBytes()));
- flowFile = session.putAttribute(flowFile,
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
- session.transfer(flowFile, REL_SUCCESS);
- } catch (JSONException je) {
- // The result doesn't contain the data, just send the
output object as the flow file content to failure (after penalizing)
- log.error("Spark Session returned an error, sending
the output JSON object as the flow file content to failure (after penalizing)");
- flowFile = session.write(flowFile, out ->
out.write(result.toString().getBytes()));
+
+ final JSONObject jsonResponseObj = new
JSONObject(jsonResponse);
+
+ Map<String, String> headers = new HashMap<>();
+ headers.put("Content-Type",
LivySessionService.APPLICATION_JSON);
+ headers.put("X-Requested-By", LivySessionService.USER);
+ headers.put("Accept", "application/json");
+
+ JSONObject jobInfo =
readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService,
headers);
+
+ flowFile = session.write(flowFile, out ->
out.write(jobInfo.toString().getBytes()));
flowFile = session.putAttribute(flowFile,
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
- flowFile = session.penalize(flowFile);
+
+ Thread.sleep(statusCheckInterval);
+
+ String state = jobInfo.getString("state");
+ log.debug(" ====> jsonResponseObj State: " + state);
+
+ switch (state) {
+ case "success":
+ log.debug(" ====> success State: " + state);
+ session.transfer(flowFile, REL_SUCCESS);
+ break;
+ case "dead":
+ log.debug(" ====> dead State: " + state);
+ session.transfer(flowFile, REL_FAILURE);
+ break;
+ default:
+ log.debug(" ====> default State: " + state);
+ session.transfer(flowFile, REL_WAIT);
+ break;
+ }
+
+ } catch (JSONException | InterruptedException e) {
+
+ //Incoming flow file is not an JSON file hence
consider it to be an triggering point
+
+ String batchPayload = "{ \"pyFiles\": [\""
+context.getProperty(PY_FILES).getValue()+ "\"], " +
+ "\"file\" :
\""+context.getProperty(MAIN_PY_FILE).getValue()+"\" }";
--- End diff --
Sometime this week I am planning to add support for jar files, args and
application name over the Livy options.
The catch here is unlike plain Spark code, batch process code will take
time to finish which is expected one as we know. So as a hack I was re-routing
the Json response after batch submission to itself, where I poll the incoming
flowfile and check whether it is a Json file and if so I will try to get the
"livy url" to post again to know the status of the batch job as long as it
runs. After knowing the the job finished, the success route is triggered.
That was the reason the I have made an assumption if the incoming file is
Json, it is from previous batch job submission.
In short, the flow file :
- Is considered as a triggering point (or)
- Is considered as plain Spark code that compiles over Livy (or)
- Is a Livy Json response, which can further be used to check the status of
long running Spark batch job
I was looking for the right Nifi way of handling this?
I feel I am too conservative and trying to fit all the functionalities on
one processor.
- Flow file/property can be used to run a Spark code
- Pyfiles can be used to run Spark batch job
- Jars can be used to run Spark batch job
- Args options for batch mode
- By rerouting the success to itself, it can monitor the long running job
over Livy rest APIs
---