Github user Mageswaran1989 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2521#discussion_r173348709
--- Diff:
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
---
@@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final
ProcessSession session) thro
String sessionId = livyController.get("sessionId");
String livyUrl = livyController.get("livyUrl");
- String code =
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
- if (StringUtils.isEmpty(code)) {
- try (InputStream inputStream = session.read(flowFile)) {
- // If no code was provided, assume it is in the content of
the incoming flow file
- code = IOUtils.toString(inputStream, charset);
- } catch (IOException ioe) {
- log.error("Error reading input flowfile, penalizing and
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
- }
- code = StringEscapeUtils.escapeJavaScript(code);
- String payload = "{\"code\":\"" + code + "\"}";
+
try {
- final JSONObject result = submitAndHandleJob(livyUrl,
livySessionService, sessionId, payload, statusCheckInterval);
- log.debug("ExecuteSparkInteractive Result of Job Submit: " +
result);
- if (result == null) {
- session.transfer(flowFile, REL_FAILURE);
- } else {
+
+ if (isBatchJob) {
+
+ String jsonResponse = null;
+
+ if (StringUtils.isEmpty(jsonResponse)) {
+ try (InputStream inputStream = session.read(flowFile))
{
+ // If no code was provided, assume it is in the
content of the incoming flow file
+ jsonResponse = IOUtils.toString(inputStream,
charset);
+ } catch (IOException ioe) {
+ log.error("Error reading input flowfile,
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()},
ioe);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+
+ log.debug(" ====> jsonResponse: " + jsonResponse);
--- End diff --
Sure, this will be removed in the next commit.
---