http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
new file mode 100644
index 0000000..d388148
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.livy;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import org.apache.nifi.processor.util.StandardValidators;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import org.apache.nifi.controller.api.livy.LivySessionService;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"spark", "livy", "http", "execute"})
+@CapabilityDescription("Execute Spark Code over a Livy-managed HTTP session to 
a live Spark context. Supports cached RDD sharing.")
+public class ExecuteSparkInteractive extends AbstractProcessor {
+
+    public static final PropertyDescriptor LIVY_CONTROLLER_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("exec-spark-iactive-livy-controller-service")
+            .displayName("Livy Controller Service")
+            .description("The controller service to use for Livy-managed 
session(s).")
+            .required(true)
+            .identifiesControllerService(LivySessionService.class)
+            .build();
+
+    public static final PropertyDescriptor CODE = new 
PropertyDescriptor.Builder()
+            .name("exec-spark-iactive-code")
+            .displayName("Code")
+            .description("The code to execute in the session. This property 
can be empty, a constant value, or built from attributes "
+                    + "using Expression Language. If this property is 
specified, it will be used regardless of the content of "
+                    + "incoming flowfiles. If this property is empty, the 
content of the incoming flow file is expected "
+                    + "to contain valid code to be issued by the processor to 
the session. Note that Expression "
+                    + "Language is not evaluated for flow file contents.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    /**
+     * Points to the charset name corresponding to the incoming flow file's 
encoding.
+     */
+    public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
+            .name("exec-spark-iactive-charset")
+            .displayName("Character Set")
+            .description("The character set encoding for the incoming flow 
file.")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor STATUS_CHECK_INTERVAL = new 
PropertyDescriptor.Builder()
+            .name("exec-spark-iactive-status-check-interval")
+            .displayName("Status Check Interval")
+            .description("The amount of time to wait between checking the 
status of an operation.")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .defaultValue("1 sec")
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are successfully processed are sent 
to this relationship")
+            .build();
+
+    public static final Relationship REL_WAIT = new Relationship.Builder()
+            .name("wait")
+            .description("FlowFiles that are waiting on an available Spark 
session will be sent to this relationship")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles are routed to this relationship when they 
cannot be parsed")
+            .build();
+
+    private volatile List<PropertyDescriptor> properties;
+    private volatile Set<Relationship> relationships;
+
+
+    public void init(final ProcessorInitializationContext context) {
+        List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(LIVY_CONTROLLER_SERVICE);
+        properties.add(CODE);
+        properties.add(CHARSET);
+        properties.add(STATUS_CHECK_INTERVAL);
+        this.properties = Collections.unmodifiableList(properties);
+
+        Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_WAIT);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, final ProcessSession 
session) throws ProcessException {
+
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ComponentLog log = getLogger();
+        final LivySessionService livySessionService = 
context.getProperty(LIVY_CONTROLLER_SERVICE).asControllerService(LivySessionService.class);
+        final Map<String, String> livyController = 
livySessionService.getSession();
+        if (livyController == null || livyController.isEmpty()) {
+            log.debug("No Spark session available (yet), routing flowfile to 
wait");
+            session.transfer(flowFile, REL_WAIT);
+            return;
+        }
+        final long statusCheckInterval = 
context.getProperty(STATUS_CHECK_INTERVAL).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS);
+        Charset charset;
+        try {
+            charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
+        } catch (Exception e) {
+            log.warn("Illegal character set name specified, defaulting to 
UTF-8");
+            charset = StandardCharsets.UTF_8;
+        }
+
+        String sessionId = livyController.get("sessionId");
+        String livyUrl = livyController.get("livyUrl");
+        String code = 
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
+        if (StringUtils.isEmpty(code)) {
+            try (InputStream inputStream = session.read(flowFile)) {
+                // If no code was provided, assume it is in the content of the 
incoming flow file
+                code = IOUtils.toString(inputStream, charset);
+            } catch (IOException ioe) {
+                log.error("Error reading input flowfile, penalizing and 
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
+                flowFile = session.penalize(flowFile);
+                session.transfer(flowFile, REL_FAILURE);
+                return;
+            }
+        }
+
+        code = StringEscapeUtils.escapeJavaScript(code);
+        String payload = "{\"code\":\"" + code + "\"}";
+        try {
+            final JSONObject result = submitAndHandleJob(livyUrl, 
livySessionService, sessionId, payload, statusCheckInterval);
+            log.debug("********** ExecuteSparkInteractive Result of Job 
Submit: " + result);
+            if (result == null) {
+                session.transfer(flowFile, REL_FAILURE);
+            } else {
+                try {
+                    final JSONObject output = result.getJSONObject("data");
+                    flowFile = session.write(flowFile, out -> 
out.write(output.toString().getBytes()));
+                    flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
+                    session.transfer(flowFile, REL_SUCCESS);
+                } catch (JSONException je) {
+                    // The result doesn't contain the data, just send the 
output object as the flow file content to failure (after penalizing)
+                    log.error("Spark Session returned an error, sending the 
output JSON object as the flow file content to failure (after penalizing)");
+                    flowFile = session.write(flowFile, out -> 
out.write(result.toString().getBytes()));
+                    flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
+                    flowFile = session.penalize(flowFile);
+                    session.transfer(flowFile, REL_FAILURE);
+                }
+            }
+        } catch (IOException ioe) {
+            log.error("Failure processing flowfile {} due to {}, penalizing 
and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+
+    private JSONObject submitAndHandleJob(String livyUrl, LivySessionService 
livySessionService, String sessionId, String payload, long statusCheckInterval) 
throws IOException {
+        ComponentLog log = getLogger();
+        String statementUrl = livyUrl + "/sessions/" + sessionId + 
"/statements";
+        JSONObject output = null;
+        Map<String, String> headers = new HashMap<>();
+        headers.put("Content-Type", LivySessionService.APPLICATION_JSON);
+        headers.put("X-Requested-By", LivySessionService.USER);
+        headers.put("Accept", "application/json");
+
+        log.debug("********** submitAndHandleJob() Submitting Job to Spark 
via: " + statementUrl);
+        try {
+            JSONObject jobInfo = readJSONObjectFromUrlPOST(statementUrl, 
livySessionService, headers, payload);
+            log.debug("********** submitAndHandleJob() Job Info: " + jobInfo);
+            String statementId = String.valueOf(jobInfo.getInt("id"));
+            statementUrl = statementUrl + "/" + statementId;
+            jobInfo = readJSONObjectFromUrl(statementUrl, livySessionService, 
headers);
+            String jobState = jobInfo.getString("state");
+
+            log.debug("********** submitAndHandleJob() New Job Info: " + 
jobInfo);
+            Thread.sleep(statusCheckInterval);
+            if (jobState.equalsIgnoreCase("available")) {
+                log.debug("********** submitAndHandleJob() Job status is: " + 
jobState + ". returning output...");
+                output = jobInfo.getJSONObject("output");
+            } else if (jobState.equalsIgnoreCase("running") || 
jobState.equalsIgnoreCase("waiting")) {
+                while (!jobState.equalsIgnoreCase("available")) {
+                    log.debug("********** submitAndHandleJob() Job status is: 
" + jobState + ". Waiting for job to complete...");
+                    Thread.sleep(statusCheckInterval);
+                    jobInfo = readJSONObjectFromUrl(statementUrl, 
livySessionService, headers);
+                    jobState = jobInfo.getString("state");
+                }
+                output = jobInfo.getJSONObject("output");
+            } else if (jobState.equalsIgnoreCase("error")
+                    || jobState.equalsIgnoreCase("cancelled")
+                    || jobState.equalsIgnoreCase("cancelling")) {
+                log.debug("********** Job status is: " + jobState + ". Job did 
not complete due to error or has been cancelled. Check SparkUI for details.");
+                throw new IOException(jobState);
+            }
+        } catch (JSONException | InterruptedException e) {
+            throw new IOException(e);
+        }
+        return output;
+    }
+
+    private JSONObject readJSONObjectFromUrlPOST(String urlString, 
LivySessionService livySessionService, Map<String, String> headers, String 
payload)
+            throws IOException, JSONException {
+
+        HttpURLConnection connection = 
livySessionService.getConnection(urlString);
+        connection.setRequestMethod("POST");
+        connection.setDoOutput(true);
+
+        for (Map.Entry<String, String> entry : headers.entrySet()) {
+            connection.setRequestProperty(entry.getKey(), entry.getValue());
+        }
+
+        OutputStream os = connection.getOutputStream();
+        os.write(payload.getBytes());
+        os.flush();
+
+        if (connection.getResponseCode() != HttpURLConnection.HTTP_OK && 
connection.getResponseCode() != HttpURLConnection.HTTP_CREATED) {
+            throw new RuntimeException("Failed : HTTP error code : " + 
connection.getResponseCode() + " : " + connection.getResponseMessage());
+        }
+
+        InputStream content = connection.getInputStream();
+        BufferedReader rd = new BufferedReader(new InputStreamReader(content, 
StandardCharsets.UTF_8));
+        String jsonText = IOUtils.toString(rd);
+        return new JSONObject(jsonText);
+    }
+
+    private JSONObject readJSONObjectFromUrl(final String urlString, 
LivySessionService livySessionService, final Map<String, String> headers)
+            throws IOException, JSONException {
+
+        HttpURLConnection connection = 
livySessionService.getConnection(urlString);
+        for (Map.Entry<String, String> entry : headers.entrySet()) {
+            connection.setRequestProperty(entry.getKey(), entry.getValue());
+        }
+        connection.setRequestMethod("GET");
+        connection.setDoOutput(true);
+        InputStream content = connection.getInputStream();
+        BufferedReader rd = new BufferedReader(new InputStreamReader(content, 
StandardCharsets.UTF_8));
+        String jsonText = IOUtils.toString(rd);
+        return new JSONObject(jsonText);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..8de81e4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.nifi.processors.livy.ExecuteSparkInteractive
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/ExecuteSparkInteractiveTestBase.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/ExecuteSparkInteractiveTestBase.java
 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/ExecuteSparkInteractiveTestBase.java
new file mode 100644
index 0000000..3a2c67a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/ExecuteSparkInteractiveTestBase.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.livy;
+
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+public class ExecuteSparkInteractiveTestBase {
+
+    public static class LivyAPIHandler extends AbstractHandler {
+
+        int session1Requests = 0;
+
+        @Override
+        public void handle(String target, Request baseRequest, 
HttpServletRequest request, HttpServletResponse response) throws IOException, 
ServletException {
+            baseRequest.setHandled(true);
+
+            response.setStatus(200);
+
+            if ("GET".equalsIgnoreCase(request.getMethod())) {
+
+                String responseBody = "{}";
+                response.setContentType("application/json");
+
+                if ("/sessions".equalsIgnoreCase(target)) {
+                    responseBody = "{\"sessions\": [{\"id\": 1, \"kind\": 
\"spark\", \"state\": \"idle\"}]}";
+                } else if (target.startsWith("/sessions/") && 
!target.contains("statement")) {
+                    responseBody = "{\"id\": 1, \"kind\": \"spark\", 
\"state\": \"idle\"}";
+
+                } else if 
("/sessions/1/statements/7".equalsIgnoreCase(target)) {
+                    switch (session1Requests) {
+                        case 0:
+                            responseBody = "{\"state\": \"waiting\"}";
+                            break;
+                        case 1:
+                            responseBody = "{\"state\": \"running\"}";
+                            break;
+                        case 2:
+                            responseBody = "{\"state\": \"available\", 
\"output\": {\"data\": {\"text/plain\": \"Hello world\"}}}";
+                            break;
+                        default:
+                            responseBody = "{\"state\": \"error\"}";
+                            break;
+                    }
+                    session1Requests++;
+                }
+
+                response.setContentLength(responseBody.length());
+
+                try (PrintWriter writer = response.getWriter()) {
+                    writer.print(responseBody);
+                    writer.flush();
+                }
+
+            } else if ("POST".equalsIgnoreCase(request.getMethod())) {
+
+                String responseBody = "{}";
+                response.setContentType("application/json");
+
+                if ("/sessions".equalsIgnoreCase(target)) {
+                    responseBody = "{\"id\": 1, \"kind\": \"spark\", 
\"state\": \"idle\"}";
+                } else if ("/sessions/1/statements".equalsIgnoreCase(target)) {
+                    responseBody = "{\"id\": 7}";
+                }
+
+                response.setContentLength(responseBody.length());
+
+                try (PrintWriter writer = response.getWriter()) {
+                    writer.print(responseBody);
+                    writer.flush();
+                }
+
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractive.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractive.java
 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractive.java
new file mode 100644
index 0000000..4219783
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractive.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.livy;
+
+import org.apache.nifi.controller.livy.LivySessionController;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.eclipse.jetty.server.Handler;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+public class TestExecuteSparkInteractive extends 
ExecuteSparkInteractiveTestBase {
+
+    public static TestServer server;
+    public static String url;
+
+    public TestRunner runner;
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        // useful for verbose logging output
+        // don't commit this with this property enabled, or any 'mvn test' 
will be really verbose
+        // 
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard", 
"debug");
+
+        // create a Jetty server on a random port
+        server = createServer();
+        server.startServer();
+
+        // this is the base url with the random port
+        url = server.getUrl();
+    }
+
+    public void addHandler(Handler handler) {
+        server.addHandler(handler);
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        server.shutdownServer();
+    }
+
+    @Before
+    public void before() throws Exception {
+        runner = TestRunners.newTestRunner(ExecuteSparkInteractive.class);
+        LivySessionController livyControllerService = new 
LivySessionController();
+        runner.addControllerService("livyCS", livyControllerService);
+        runner.setProperty(livyControllerService, 
LivySessionController.LIVY_HOST, url.substring(url.indexOf("://") + 3, 
url.lastIndexOf(":")));
+        runner.setProperty(livyControllerService, 
LivySessionController.LIVY_PORT, url.substring(url.lastIndexOf(":") + 1));
+        runner.enableControllerService(livyControllerService);
+        runner.setProperty(ExecuteSparkInteractive.LIVY_CONTROLLER_SERVICE, 
"livyCS");
+
+        server.clearHandlers();
+    }
+
+    @After
+    public void after() {
+        runner.shutdown();
+    }
+
+    private static TestServer createServer() throws IOException {
+        return new TestServer();
+    }
+
+    @Test
+    public void testSparkSession() throws Exception {
+
+        addHandler(new LivyAPIHandler());
+
+        runner.enqueue("print \"hello world\"");
+        runner.run();
+        List<MockFlowFile> waitingFlowfiles = 
runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT);
+        while (!waitingFlowfiles.isEmpty()) {
+            Thread.sleep(1000);
+            runner.clearTransferState();
+            runner.enqueue("print \"hello world\"");
+            runner.run();
+            waitingFlowfiles = 
runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT);
+        }
+        runner.assertTransferCount(ExecuteSparkInteractive.REL_SUCCESS, 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractiveSSL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractiveSSL.java
 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractiveSSL.java
new file mode 100644
index 0000000..e732617
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractiveSSL.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.livy;
+
+import org.apache.nifi.controller.livy.LivySessionController;
+import org.apache.nifi.ssl.StandardSSLContextService;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.eclipse.jetty.server.Handler;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestExecuteSparkInteractiveSSL extends 
ExecuteSparkInteractiveTestBase {
+
+    private static Map<String, String> sslProperties;
+
+    public static TestServer server;
+    public static String url;
+
+    public TestRunner runner;
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        // useful for verbose logging output
+        // don't commit this with this property enabled, or any 'mvn test' 
will be really verbose
+        // 
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard", 
"debug");
+
+        // create the SSL properties, which basically store keystore / 
trustore information
+        // this is used by the StandardSSLContextService and the Jetty Server
+        sslProperties = createSslProperties();
+
+        // create a Jetty server on a random port
+        server = createServer();
+        server.startServer();
+
+        // Allow time for the server to start
+        Thread.sleep(1000);
+
+        // this is the base url with the random port
+        url = server.getSecureUrl();
+    }
+
+    public void addHandler(Handler handler) {
+        server.addHandler(handler);
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        server.shutdownServer();
+    }
+
+    @Before
+    public void before() throws Exception {
+        runner = TestRunners.newTestRunner(ExecuteSparkInteractive.class);
+
+        final StandardSSLContextService sslService = new 
StandardSSLContextService();
+        runner.addControllerService("ssl-context", sslService, sslProperties);
+        runner.enableControllerService(sslService);
+
+        // Allow time for the controller service to fully initialize
+        Thread.sleep(500);
+
+        LivySessionController livyControllerService = new 
LivySessionController();
+        runner.addControllerService("livyCS", livyControllerService);
+        runner.setProperty(livyControllerService, 
LivySessionController.LIVY_HOST, url.substring(url.indexOf("://") + 3, 
url.lastIndexOf(":")));
+        runner.setProperty(livyControllerService, 
LivySessionController.LIVY_PORT, url.substring(url.lastIndexOf(":") + 1));
+        runner.setProperty(livyControllerService, 
LivySessionController.SSL_CONTEXT_SERVICE, "ssl-context");
+        runner.enableControllerService(livyControllerService);
+
+        runner.setProperty(ExecuteSparkInteractive.LIVY_CONTROLLER_SERVICE, 
"livyCS");
+
+        server.clearHandlers();
+    }
+
+    @After
+    public void after() {
+        runner.shutdown();
+    }
+
+    private static TestServer createServer() throws IOException {
+        return new TestServer(sslProperties);
+    }
+
+    @Test
+    public void testSslSparkSession() throws Exception {
+        addHandler(new LivyAPIHandler());
+
+        runner.enqueue("print \"hello world\"");
+        runner.run();
+        List<MockFlowFile> waitingFlowfiles = 
runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT);
+        while (!waitingFlowfiles.isEmpty()) {
+            Thread.sleep(1000);
+            runner.clearTransferState();
+            runner.enqueue("print \"hello world\"");
+            runner.run();
+            waitingFlowfiles = 
runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT);
+        }
+        runner.assertTransferCount(ExecuteSparkInteractive.REL_SUCCESS, 1);
+    }
+
+    private static Map<String, String> createSslProperties() {
+        final Map<String, String> map = new HashMap<>();
+        map.put(StandardSSLContextService.KEYSTORE.getName(), 
"src/test/resources/localhost-ks.jks");
+        map.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), 
"localtest");
+        map.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS");
+        map.put(StandardSSLContextService.TRUSTSTORE.getName(), 
"src/test/resources/localhost-ts.jks");
+        map.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), 
"localtest");
+        map.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
+        return map;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestServer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestServer.java
 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestServer.java
new file mode 100644
index 0000000..396df07
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestServer.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.livy;
+
+import org.apache.nifi.ssl.StandardSSLContextService;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+
+import java.util.Map;
+
+/**
+ * Test server to assist with unit tests that requires a server to be stood up.
+ */
+/**
+ * Test server to assist with unit tests that requires a server to be stood up.
+ */
+public class TestServer {
+
+    public static final String NEED_CLIENT_AUTH = "clientAuth";
+
+    private Server jetty;
+    private boolean secure = false;
+
+    /**
+     * Creates the test server.
+     */
+    public TestServer() {
+        createServer(null);
+    }
+
+    /**
+     * Creates the test server.
+     *
+     * @param sslProperties SSLProps to be used in the secure connection. The 
keys should should use the StandardSSLContextService properties.
+     */
+    public TestServer(final Map<String, String> sslProperties) {
+        createServer(sslProperties);
+    }
+
+    private void createServer(final Map<String, String> sslProperties) {
+        jetty = new Server();
+
+        // create the unsecure connector
+        createConnector();
+
+        // create the secure connector if sslProperties are specified
+        if (sslProperties != null) {
+            createSecureConnector(sslProperties);
+        }
+
+        jetty.setHandler(new HandlerCollection(true));
+    }
+
+    /**
+     * Creates the http connection
+     */
+    private void createConnector() {
+        final ServerConnector http = new ServerConnector(jetty);
+        http.setPort(0);
+        // Severely taxed environments may have significant delays when 
executing.
+        http.setIdleTimeout(30000L);
+        jetty.addConnector(http);
+    }
+
+    private void createSecureConnector(final Map<String, String> 
sslProperties) {
+        SslContextFactory ssl = new SslContextFactory();
+
+        if (sslProperties.get(StandardSSLContextService.KEYSTORE.getName()) != 
null) {
+            
ssl.setKeyStorePath(sslProperties.get(StandardSSLContextService.KEYSTORE.getName()));
+            
ssl.setKeyStorePassword(sslProperties.get(StandardSSLContextService.KEYSTORE_PASSWORD.getName()));
+            
ssl.setKeyStoreType(sslProperties.get(StandardSSLContextService.KEYSTORE_TYPE.getName()));
+        }
+
+        if (sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()) 
!= null) {
+            
ssl.setTrustStorePath(sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()));
+            
ssl.setTrustStorePassword(sslProperties.get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()));
+            
ssl.setTrustStoreType(sslProperties.get(StandardSSLContextService.TRUSTSTORE_TYPE.getName()));
+        }
+
+        final String clientAuth = sslProperties.get(NEED_CLIENT_AUTH);
+        if (clientAuth == null) {
+            ssl.setNeedClientAuth(true);
+        } else {
+            ssl.setNeedClientAuth(Boolean.parseBoolean(clientAuth));
+        }
+
+        // build the connector
+        final ServerConnector https = new ServerConnector(jetty, ssl);
+
+        // set host and port
+        https.setPort(0);
+        // Severely taxed environments may have significant delays when 
executing.
+        https.setIdleTimeout(30000L);
+
+        // add the connector
+        jetty.addConnector(https);
+
+        // mark secure as enabled
+        secure = true;
+    }
+
+    public void clearHandlers() {
+        HandlerCollection hc = (HandlerCollection) jetty.getHandler();
+        Handler[] ha = hc.getHandlers();
+        if (ha != null) {
+            for (Handler h : ha) {
+                hc.removeHandler(h);
+            }
+        }
+    }
+
+    public void addHandler(Handler handler) {
+        ((HandlerCollection) jetty.getHandler()).addHandler(handler);
+    }
+
+    public void startServer() throws Exception {
+        jetty.start();
+    }
+
+    public void shutdownServer() throws Exception {
+        jetty.stop();
+        jetty.destroy();
+    }
+
+    private int getPort() {
+        if (!jetty.isStarted()) {
+            throw new IllegalStateException("Jetty server not started");
+        }
+        return ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
+    }
+
+    private int getSecurePort() {
+        if (!jetty.isStarted()) {
+            throw new IllegalStateException("Jetty server not started");
+        }
+        return ((ServerConnector) jetty.getConnectors()[1]).getLocalPort();
+    }
+
+    public String getUrl() {
+        return "http://localhost:"; + getPort();
+    }
+
+    public String getSecureUrl() {
+        String url = null;
+        if (secure) {
+            url = "https://localhost:"; + getSecurePort();
+        }
+        return url;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/resources/localhost-ks.jks
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/resources/localhost-ks.jks
 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/resources/localhost-ks.jks
new file mode 100755
index 0000000..df36197
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/resources/localhost-ks.jks
 differ

http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/resources/localhost-ts.jks
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/resources/localhost-ts.jks
 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/resources/localhost-ts.jks
new file mode 100755
index 0000000..7824378
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/resources/localhost-ts.jks
 differ

http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-spark-bundle/pom.xml 
b/nifi-nar-bundles/nifi-spark-bundle/pom.xml
new file mode 100644
index 0000000..91b0ce8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-spark-bundle/pom.xml
@@ -0,0 +1,85 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>1.5.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-spark-bundle</artifactId>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>nifi-livy-nar</module>
+        <module>nifi-livy-controller-service-api-nar</module>
+        <module>nifi-livy-controller-service-api</module>
+        <module>nifi-livy-controller-service</module>
+        <module>nifi-livy-processors</module>
+    </modules>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.codehaus.jackson</groupId>
+                <artifactId>jackson-core-asl</artifactId>
+                <version>1.9.13</version>
+            </dependency>
+            <dependency>
+                <groupId>org.codehaus.jackson</groupId>
+                <artifactId>jackson-mapper-asl</artifactId>
+                <version>1.9.13</version>
+            </dependency>
+            <dependency>
+                <groupId>org.json</groupId>
+                <artifactId>json</artifactId>
+                <version>20160810</version>
+            </dependency>
+            <dependency>
+                <groupId>org.codehaus.jettison</groupId>
+                <artifactId>jettison</artifactId>
+                <version>1.3.8</version>
+            </dependency>
+            <dependency>
+                <groupId>commons-configuration</groupId>
+                <artifactId>commons-configuration</artifactId>
+                <version>1.10</version>
+            </dependency>
+            <dependency>
+                <groupId>com.sun.jersey</groupId>
+                <artifactId>jersey-client</artifactId>
+                <version>1.19.1</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 0462e72..f462883 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -89,6 +89,7 @@
         <module>nifi-extension-utils</module>
         <module>nifi-redis-bundle</module>
         <module>nifi-metrics-reporting-bundle</module>
+        <module>nifi-spark-bundle</module>
   </modules>
 
     <build>

http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 11caa40..3f573b6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1462,7 +1462,19 @@
                 <version>1.5.0-SNAPSHOT</version>
                 <type>nar</type>
             </dependency>
-               <dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-livy-controller-service-api-nar</artifactId>
+                <version>1.5.0-SNAPSHOT</version>
+                <type>nar</type>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-livy-nar</artifactId>
+                <version>1.5.0-SNAPSHOT</version>
+                <type>nar</type>
+            </dependency>
+            <dependency>
                 <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-properties</artifactId>
                 <version>1.5.0-SNAPSHOT</version>

Reply via email to