http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/BaseProtocol.java ---------------------------------------------------------------------- diff --git a/rsc/src/main/java/org/apache/livy/rsc/BaseProtocol.java b/rsc/src/main/java/org/apache/livy/rsc/BaseProtocol.java new file mode 100644 index 0000000..c25e98f --- /dev/null +++ b/rsc/src/main/java/org/apache/livy/rsc/BaseProtocol.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.rsc; + +import org.apache.livy.Job; +import org.apache.livy.rsc.rpc.RpcDispatcher; + +public abstract class BaseProtocol extends RpcDispatcher { + + protected static class CancelJob { + + public final String id; + + CancelJob(String id) { + this.id = id; + } + + CancelJob() { + this(null); + } + + } + + protected static class EndSession { + + } + + protected static class Error { + + public final String cause; + + public Error(Throwable cause) { + if (cause == null) { + this.cause = ""; + } else { + this.cause = Utils.stackTraceAsString(cause); + } + } + + public Error() { + this(null); + } + + } + + public static class BypassJobRequest { + + public final String id; + public final byte[] serializedJob; + public final boolean synchronous; + + public BypassJobRequest(String id, byte[] serializedJob, boolean synchronous) { + this.id = id; + this.serializedJob = serializedJob; + this.synchronous = synchronous; + } + + public BypassJobRequest() { + this(null, null, false); + } + + } + + protected static class GetBypassJobStatus { + + public final String id; + + public GetBypassJobStatus(String id) { + this.id = id; + } + + public GetBypassJobStatus() { + this(null); + } + + } + + protected static class JobRequest<T> { + + public final String id; + public final Job<T> job; + + public JobRequest(String id, Job<T> job) { + this.id = id; + this.job = job; + } + + public JobRequest() { + this(null, null); + } + + } + + protected static class JobResult<T> { + + public final String id; + public final T result; + public final String error; + + public JobResult(String id, T result, Throwable error) { + this.id = id; + this.result = result; + this.error = error != null ? Utils.stackTraceAsString(error) : null; + } + + public JobResult() { + this(null, null, null); + } + + } + + protected static class JobStarted { + + public final String id; + + public JobStarted(String id) { + this.id = id; + } + + public JobStarted() { + this(null); + } + + } + + protected static class SyncJobRequest<T> { + + public final Job<T> job; + + public SyncJobRequest(Job<T> job) { + this.job = job; + } + + public SyncJobRequest() { + this(null); + } + + } + + public static class RemoteDriverAddress { + + public final String host; + public final int port; + + public RemoteDriverAddress(String host, int port) { + this.host = host; + this.port = port; + } + + public RemoteDriverAddress() { + this(null, -1); + } + + } + + public static class ReplJobRequest { + + public final String code; + + public ReplJobRequest(String code) { + this.code = code; + } + + public ReplJobRequest() { + this(null); + } + } + + public static class GetReplJobResults { + public boolean allResults; + public Integer from, size; + + public GetReplJobResults(Integer from, Integer size) { + this.allResults = false; + this.from = from; + this.size = size; + } + + public GetReplJobResults() { + this.allResults = true; + from = null; + size = null; + } + } + + protected static class ReplState { + + public final String state; + + public ReplState(String state) { + this.state = state; + } + + public ReplState() { + this(null); + } + } + + public static class CancelReplJobRequest { + public final int id; + + public CancelReplJobRequest(int id) { + this.id = id; + } + + public CancelReplJobRequest() { + this(-1); + } + } + + public static class InitializationError { + + public final String stackTrace; + + public InitializationError(String stackTrace) { + this.stackTrace = stackTrace; + } + + public InitializationError() { + this(null); + } + + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/BypassJobStatus.java ---------------------------------------------------------------------- diff --git a/rsc/src/main/java/org/apache/livy/rsc/BypassJobStatus.java b/rsc/src/main/java/org/apache/livy/rsc/BypassJobStatus.java new file mode 100644 index 0000000..f62c14d --- /dev/null +++ b/rsc/src/main/java/org/apache/livy/rsc/BypassJobStatus.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.rsc; + +import org.apache.livy.JobHandle; + +public class BypassJobStatus { + + public final JobHandle.State state; + public final byte[] result; + public final String error; + + public BypassJobStatus(JobHandle.State state, byte[] result, String error) { + this.state = state; + this.result = result; + this.error = error; + } + + BypassJobStatus() { + this(null, null, null); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/ContextInfo.java ---------------------------------------------------------------------- diff --git a/rsc/src/main/java/org/apache/livy/rsc/ContextInfo.java b/rsc/src/main/java/org/apache/livy/rsc/ContextInfo.java new file mode 100644 index 0000000..96f69a4 --- /dev/null +++ b/rsc/src/main/java/org/apache/livy/rsc/ContextInfo.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.rsc; + +/** + * Information about a running RSC instance. + */ +class ContextInfo { + + final String remoteAddress; + final int remotePort; + final String clientId; + final String secret; + + ContextInfo(String remoteAddress, int remotePort, String clientId, String secret) { + this.remoteAddress = remoteAddress; + this.remotePort = remotePort; + this.clientId = clientId; + this.secret = secret; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java ---------------------------------------------------------------------- diff --git a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java new file mode 100644 index 0000000..8f46c1e --- /dev/null +++ b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.rsc; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Reader; +import java.io.Writer; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.nio.file.attribute.PosixFilePermission.*; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.concurrent.Promise; +import org.apache.spark.launcher.SparkLauncher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.livy.client.common.TestUtils; +import org.apache.livy.rsc.driver.RSCDriverBootstrapper; +import org.apache.livy.rsc.rpc.Rpc; +import org.apache.livy.rsc.rpc.RpcDispatcher; +import org.apache.livy.rsc.rpc.RpcServer; + +import static org.apache.livy.rsc.RSCConf.Entry.*; + +/** + * Encapsulates code needed to launch a new Spark context and collect information about how + * to establish a client connection to it. + */ +class ContextLauncher { + + private static final Logger LOG = LoggerFactory.getLogger(ContextLauncher.class); + private static final AtomicInteger CHILD_IDS = new AtomicInteger(); + + private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode"; + private static final String SPARK_JARS_KEY = "spark.jars"; + private static final String SPARK_ARCHIVES_KEY = "spark.yarn.dist.archives"; + private static final String SPARK_HOME_ENV = "SPARK_HOME"; + + static DriverProcessInfo create(RSCClientFactory factory, RSCConf conf) + throws IOException { + ContextLauncher launcher = new ContextLauncher(factory, conf); + return new DriverProcessInfo(launcher.promise, launcher.child.child); + } + + private final Promise<ContextInfo> promise; + private final ScheduledFuture<?> timeout; + private final String clientId; + private final String secret; + private final ChildProcess child; + private final RSCConf conf; + private final RSCClientFactory factory; + + private ContextLauncher(RSCClientFactory factory, RSCConf conf) throws IOException { + this.promise = factory.getServer().getEventLoopGroup().next().newPromise(); + this.clientId = UUID.randomUUID().toString(); + this.secret = factory.getServer().createSecret(); + this.conf = conf; + this.factory = factory; + + final RegistrationHandler handler = new RegistrationHandler(); + try { + factory.getServer().registerClient(clientId, secret, handler); + String replMode = conf.get("repl"); + boolean repl = replMode != null && replMode.equals("true"); + + conf.set(LAUNCHER_ADDRESS, factory.getServer().getAddress()); + conf.set(LAUNCHER_PORT, factory.getServer().getPort()); + conf.set(CLIENT_ID, clientId); + conf.set(CLIENT_SECRET, secret); + + Utils.addListener(promise, new FutureListener<ContextInfo>() { + @Override + public void onFailure(Throwable error) throws Exception { + // If promise is cancelled or failed, make sure spark-submit is not leaked. + if (child != null) { + child.kill(); + } + } + }); + + this.child = startDriver(conf, promise); + + // Set up a timeout to fail the promise if we don't hear back from the context + // after a configurable timeout. + Runnable timeoutTask = new Runnable() { + @Override + public void run() { + connectTimeout(handler); + } + }; + this.timeout = factory.getServer().getEventLoopGroup().schedule(timeoutTask, + conf.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS); + } catch (Exception e) { + dispose(true); + throw Utils.propagate(e); + } + } + + private void connectTimeout(RegistrationHandler handler) { + if (promise.tryFailure(new TimeoutException("Timed out waiting for context to start."))) { + handler.dispose(); + } + dispose(true); + } + + private void dispose(boolean forceKill) { + factory.getServer().unregisterClient(clientId); + try { + if (child != null) { + if (forceKill) { + child.kill(); + } else { + child.detach(); + } + } + } finally { + factory.unref(); + } + } + + private static ChildProcess startDriver(final RSCConf conf, Promise<?> promise) + throws IOException { + String livyJars = conf.get(LIVY_JARS); + if (livyJars == null) { + String livyHome = System.getenv("LIVY_HOME"); + Utils.checkState(livyHome != null, + "Need one of LIVY_HOME or %s set.", LIVY_JARS.key()); + File rscJars = new File(livyHome, "rsc-jars"); + if (!rscJars.isDirectory()) { + rscJars = new File(livyHome, "rsc/target/jars"); + } + Utils.checkState(rscJars.isDirectory(), + "Cannot find 'client-jars' directory under LIVY_HOME."); + List<String> jars = new ArrayList<>(); + for (File f : rscJars.listFiles()) { + jars.add(f.getAbsolutePath()); + } + livyJars = Utils.join(jars, ","); + } + merge(conf, SPARK_JARS_KEY, livyJars, ","); + + String kind = conf.get(SESSION_KIND); + if ("sparkr".equals(kind)) { + merge(conf, SPARK_ARCHIVES_KEY, conf.get(RSCConf.Entry.SPARKR_PACKAGE), ","); + } else if ("pyspark".equals(kind)) { + merge(conf, "spark.submit.pyFiles", conf.get(RSCConf.Entry.PYSPARK_ARCHIVES), ","); + } + + // Disable multiple attempts since the RPC server doesn't yet support multiple + // connections for the same registered app. + conf.set("spark.yarn.maxAppAttempts", "1"); + + // Let the launcher go away when launcher in yarn cluster mode. This avoids keeping lots + // of "small" Java processes lingering on the Livy server node. + conf.set("spark.yarn.submit.waitAppCompletion", "false"); + + if (!conf.getBoolean(CLIENT_IN_PROCESS) && + // For tests which doesn't shutdown RscDriver gracefully, JaCoCo exec isn't dumped properly. + // Disable JaCoCo for this case. + !conf.getBoolean(TEST_STUCK_END_SESSION)) { + // For testing; propagate jacoco settings so that we also do coverage analysis + // on the launched driver. We replace the name of the main file ("main.exec") + // so that we don't end up fighting with the main test launcher. + String jacocoArgs = TestUtils.getJacocoArgs(); + if (jacocoArgs != null) { + merge(conf, SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, jacocoArgs, " "); + } + } + + final File confFile = writeConfToFile(conf); + + if (ContextLauncher.mockSparkSubmit != null) { + LOG.warn("!!!! Using mock spark-submit. !!!!"); + return new ChildProcess(conf, promise, ContextLauncher.mockSparkSubmit, confFile); + } else if (conf.getBoolean(CLIENT_IN_PROCESS)) { + // Mostly for testing things quickly. Do not do this in production. + LOG.warn("!!!! Running remote driver in-process. !!!!"); + Runnable child = new Runnable() { + @Override + public void run() { + try { + RSCDriverBootstrapper.main(new String[] { confFile.getAbsolutePath() }); + } catch (Exception e) { + throw Utils.propagate(e); + } + } + }; + return new ChildProcess(conf, promise, child, confFile); + } else { + final SparkLauncher launcher = new SparkLauncher(); + + // Spark 1.x does not support specifying deploy mode in conf and needs special handling. + String deployMode = conf.get(SPARK_DEPLOY_MODE); + if (deployMode != null) { + launcher.setDeployMode(deployMode); + } + + launcher.setSparkHome(System.getenv(SPARK_HOME_ENV)); + launcher.setAppResource("spark-internal"); + launcher.setPropertiesFile(confFile.getAbsolutePath()); + launcher.setMainClass(RSCDriverBootstrapper.class.getName()); + + if (conf.get(PROXY_USER) != null) { + launcher.addSparkArg("--proxy-user", conf.get(PROXY_USER)); + } + + return new ChildProcess(conf, promise, launcher.launch(), confFile); + } + } + + private static void merge(RSCConf conf, String key, String livyConf, String sep) { + String confValue = Utils.join(Arrays.asList(livyConf, conf.get(key)), sep); + conf.set(key, confValue); + } + + /** + * Write the configuration to a file readable only by the process's owner. Livy properties + * are written with an added prefix so that they can be loaded using SparkConf on the driver + * side. + * + * The default Spark configuration (from either SPARK_HOME or SPARK_CONF_DIR) is merged into + * the user configuration, so that defaults set by Livy's admin take effect when not overridden + * by the user. + */ + private static File writeConfToFile(RSCConf conf) throws IOException { + Properties confView = new Properties(); + for (Map.Entry<String, String> e : conf) { + String key = e.getKey(); + if (!key.startsWith(RSCConf.SPARK_CONF_PREFIX)) { + key = RSCConf.LIVY_SPARK_PREFIX + key; + } + confView.setProperty(key, e.getValue()); + } + + // Load the default Spark configuration. + String confDir = System.getenv("SPARK_CONF_DIR"); + if (confDir == null && System.getenv(SPARK_HOME_ENV) != null) { + confDir = System.getenv(SPARK_HOME_ENV) + File.separator + "conf"; + } + + if (confDir != null) { + File sparkDefaults = new File(confDir + File.separator + "spark-defaults.conf"); + if (sparkDefaults.isFile()) { + Properties sparkConf = new Properties(); + Reader r = new InputStreamReader(new FileInputStream(sparkDefaults), UTF_8); + try { + sparkConf.load(r); + } finally { + r.close(); + } + + for (String key : sparkConf.stringPropertyNames()) { + if (!confView.containsKey(key)) { + confView.put(key, sparkConf.getProperty(key)); + } + } + } + } + + File file = File.createTempFile("livyConf", ".properties"); + Files.setPosixFilePermissions(file.toPath(), EnumSet.of(OWNER_READ, OWNER_WRITE)); + //file.deleteOnExit(); + + Writer writer = new OutputStreamWriter(new FileOutputStream(file), UTF_8); + try { + confView.store(writer, "Livy App Context Configuration"); + } finally { + writer.close(); + } + + return file; + } + + + private class RegistrationHandler extends BaseProtocol + implements RpcServer.ClientCallback { + + volatile RemoteDriverAddress driverAddress; + + private Rpc client; + + @Override + public RpcDispatcher onNewClient(Rpc client) { + LOG.debug("New RPC client connected from {}.", client.getChannel()); + this.client = client; + return this; + } + + @Override + public void onSaslComplete(Rpc client) { + } + + void dispose() { + if (client != null) { + client.close(); + } + } + + private void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) { + ContextInfo info = new ContextInfo(msg.host, msg.port, clientId, secret); + if (promise.trySuccess(info)) { + timeout.cancel(true); + LOG.debug("Received driver info for client {}: {}/{}.", client.getChannel(), + msg.host, msg.port); + } else { + LOG.warn("Connection established but promise is already finalized."); + } + + ctx.executor().submit(new Runnable() { + @Override + public void run() { + dispose(); + ContextLauncher.this.dispose(false); + } + }); + } + + } + + private static class ChildProcess { + + private final RSCConf conf; + private final Promise<?> promise; + private final Process child; + private final Thread monitor; + private final File confFile; + + public ChildProcess(RSCConf conf, Promise<?> promise, Runnable child, File confFile) { + this.conf = conf; + this.promise = promise; + this.monitor = monitor(child, CHILD_IDS.incrementAndGet()); + this.child = null; + this.confFile = confFile; + } + + public ChildProcess(RSCConf conf, Promise<?> promise, final Process childProc, File confFile) { + int childId = CHILD_IDS.incrementAndGet(); + this.conf = conf; + this.promise = promise; + this.child = childProc; + this.confFile = confFile; + + Runnable monitorTask = new Runnable() { + @Override + public void run() { + try { + int exitCode = child.waitFor(); + if (exitCode != 0) { + LOG.warn("Child process exited with code {}.", exitCode); + fail(new IOException(String.format("Child process exited with code %d.", exitCode))); + } + } catch (InterruptedException ie) { + LOG.warn("Waiting thread interrupted, killing child process."); + Thread.interrupted(); + child.destroy(); + } catch (Exception e) { + LOG.warn("Exception while waiting for child process.", e); + } + } + }; + this.monitor = monitor(monitorTask, childId); + } + + private void fail(Throwable error) { + promise.tryFailure(error); + } + + public void kill() { + if (child != null) { + child.destroy(); + } + monitor.interrupt(); + detach(); + + if (!monitor.isAlive()) { + return; + } + + // Last ditch effort. + if (monitor.isAlive()) { + LOG.warn("Timed out shutting down remote driver, interrupting..."); + monitor.interrupt(); + } + } + + public void detach() { + try { + monitor.join(conf.getTimeAsMs(CLIENT_SHUTDOWN_TIMEOUT)); + } catch (InterruptedException ie) { + LOG.debug("Interrupted before driver thread was finished."); + } + } + + private Thread monitor(final Runnable task, int childId) { + Runnable wrappedTask = new Runnable() { + @Override + public void run() { + try { + task.run(); + } finally { + confFile.delete(); + } + } + }; + Thread thread = new Thread(wrappedTask); + thread.setDaemon(true); + thread.setName("ContextLauncher-" + childId); + thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.warn("Child task threw exception.", e); + fail(e); + } + }); + thread.start(); + return thread; + } + } + + // Just for testing. + static Process mockSparkSubmit; + +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/DriverProcessInfo.java ---------------------------------------------------------------------- diff --git a/rsc/src/main/java/org/apache/livy/rsc/DriverProcessInfo.java b/rsc/src/main/java/org/apache/livy/rsc/DriverProcessInfo.java new file mode 100644 index 0000000..ddc991c --- /dev/null +++ b/rsc/src/main/java/org/apache/livy/rsc/DriverProcessInfo.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.rsc; + +import io.netty.util.concurrent.Promise; + +/** + * Information about driver process and @{@link ContextInfo} + */ +public class DriverProcessInfo { + + private Promise<ContextInfo> contextInfo; + private transient Process driverProcess; + + public DriverProcessInfo(Promise<ContextInfo> contextInfo, Process driverProcess) { + this.contextInfo = contextInfo; + this.driverProcess = driverProcess; + } + + public Promise<ContextInfo> getContextInfo() { + return contextInfo; + } + + public Process getDriverProcess() { + return driverProcess; + } +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/FutureListener.java ---------------------------------------------------------------------- diff --git a/rsc/src/main/java/org/apache/livy/rsc/FutureListener.java b/rsc/src/main/java/org/apache/livy/rsc/FutureListener.java new file mode 100644 index 0000000..9b99eae --- /dev/null +++ b/rsc/src/main/java/org/apache/livy/rsc/FutureListener.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.rsc; + +/** A simplified future listener for netty futures. See Utils.addListener(). */ +public abstract class FutureListener<T> { + + public void onSuccess(T result) throws Exception { } + + public void onFailure(Throwable error) throws Exception { } + +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/JobHandleImpl.java ---------------------------------------------------------------------- diff --git a/rsc/src/main/java/org/apache/livy/rsc/JobHandleImpl.java b/rsc/src/main/java/org/apache/livy/rsc/JobHandleImpl.java new file mode 100644 index 0000000..0fc4ba2 --- /dev/null +++ b/rsc/src/main/java/org/apache/livy/rsc/JobHandleImpl.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.rsc; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import io.netty.util.concurrent.Promise; + +import org.apache.livy.JobHandle; +import org.apache.livy.client.common.AbstractJobHandle; + +/** + * A handle to a submitted job. Allows for monitoring and controlling of the running remote job. + */ +class JobHandleImpl<T> extends AbstractJobHandle<T> { + + private final RSCClient client; + private final String jobId; + private final Promise<T> promise; + private volatile State state; + + JobHandleImpl(RSCClient client, Promise<T> promise, String jobId) { + this.client = client; + this.jobId = jobId; + this.promise = promise; + } + + /** Requests a running job to be cancelled. */ + @Override + public boolean cancel(boolean mayInterrupt) { + if (changeState(State.CANCELLED)) { + client.cancel(jobId); + promise.cancel(mayInterrupt); + return true; + } + return false; + } + + @Override + public T get() throws ExecutionException, InterruptedException { + return promise.get(); + } + + @Override + public T get(long timeout, TimeUnit unit) + throws ExecutionException, InterruptedException, TimeoutException { + return promise.get(timeout, unit); + } + + @Override + public boolean isCancelled() { + return promise.isCancelled(); + } + + @Override + public boolean isDone() { + return promise.isDone(); + } + + @Override + protected T result() { + return promise.getNow(); + } + + @Override + protected Throwable error() { + return promise.cause(); + } + + @SuppressWarnings("unchecked") + void setSuccess(Object result) { + // The synchronization here is not necessary, but tests depend on it. + synchronized (listeners) { + promise.setSuccess((T) result); + changeState(State.SUCCEEDED); + } + } + + void setFailure(Throwable error) { + // The synchronization here is not necessary, but tests depend on it. + synchronized (listeners) { + promise.setFailure(error); + changeState(State.FAILED); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/PingJob.java ---------------------------------------------------------------------- diff --git a/rsc/src/main/java/org/apache/livy/rsc/PingJob.java b/rsc/src/main/java/org/apache/livy/rsc/PingJob.java new file mode 100644 index 0000000..221f57f --- /dev/null +++ b/rsc/src/main/java/org/apache/livy/rsc/PingJob.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.rsc; + +import org.apache.livy.Job; +import org.apache.livy.JobContext; + +/** A job that can be used to check for liveness of the remote context. */ +public class PingJob implements Job<Void> { + + @Override + public Void call(JobContext jc) { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java ---------------------------------------------------------------------- diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java new file mode 100644 index 0000000..1b38467 --- /dev/null +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.rsc; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.ImmediateEventExecutor; +import io.netty.util.concurrent.Promise; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.livy.Job; +import org.apache.livy.JobHandle; +import org.apache.livy.LivyClient; +import org.apache.livy.client.common.BufferUtils; +import org.apache.livy.rsc.driver.AddFileJob; +import org.apache.livy.rsc.driver.AddJarJob; +import org.apache.livy.rsc.rpc.Rpc; + +import static org.apache.livy.rsc.RSCConf.Entry.*; + +public class RSCClient implements LivyClient { + private static final Logger LOG = LoggerFactory.getLogger(RSCClient.class); + private static final AtomicInteger EXECUTOR_GROUP_ID = new AtomicInteger(); + + private final RSCConf conf; + private final Promise<ContextInfo> contextInfoPromise; + private final Map<String, JobHandleImpl<?>> jobs; + private final ClientProtocol protocol; + private final Promise<Rpc> driverRpc; + private final int executorGroupId; + private final EventLoopGroup eventLoopGroup; + private final Promise<URI> serverUriPromise; + + private ContextInfo contextInfo; + private Process driverProcess; + private volatile boolean isAlive; + private volatile String replState; + + RSCClient(RSCConf conf, Promise<ContextInfo> ctx, Process driverProcess) throws IOException { + this.conf = conf; + this.contextInfoPromise = ctx; + this.driverProcess = driverProcess; + this.jobs = new ConcurrentHashMap<>(); + this.protocol = new ClientProtocol(); + this.driverRpc = ImmediateEventExecutor.INSTANCE.newPromise(); + this.executorGroupId = EXECUTOR_GROUP_ID.incrementAndGet(); + this.eventLoopGroup = new NioEventLoopGroup( + conf.getInt(RPC_MAX_THREADS), + Utils.newDaemonThreadFactory("RSCClient-" + executorGroupId + "-%d")); + this.serverUriPromise = ImmediateEventExecutor.INSTANCE.newPromise(); + + Utils.addListener(this.contextInfoPromise, new FutureListener<ContextInfo>() { + @Override + public void onSuccess(ContextInfo info) throws Exception { + connectToContext(info); + String url = String.format("rsc://%s:%s@%s:%d", + info.clientId, info.secret, info.remoteAddress, info.remotePort); + serverUriPromise.setSuccess(URI.create(url)); + } + + @Override + public void onFailure(Throwable error) { + connectionError(error); + serverUriPromise.setFailure(error); + } + }); + + isAlive = true; + } + + public boolean isAlive() { + return isAlive; + } + + public Process getDriverProcess() { + return driverProcess; + } + + private synchronized void connectToContext(final ContextInfo info) throws Exception { + this.contextInfo = info; + + try { + Promise<Rpc> promise = Rpc.createClient(conf, + eventLoopGroup, + info.remoteAddress, + info.remotePort, + info.clientId, + info.secret, + protocol); + Utils.addListener(promise, new FutureListener<Rpc>() { + @Override + public void onSuccess(Rpc rpc) throws Exception { + driverRpc.setSuccess(rpc); + Utils.addListener(rpc.getChannel().closeFuture(), new FutureListener<Void>() { + @Override + public void onSuccess(Void unused) { + if (isAlive) { + LOG.warn("Client RPC channel closed unexpectedly."); + try { + stop(false); + } catch (Exception e) { /* stop() itself prints warning. */ } + } + } + }); + LOG.debug("Connected to context {} ({}, {}).", info.clientId, + rpc.getChannel(), executorGroupId); + } + + @Override + public void onFailure(Throwable error) throws Exception { + driverRpc.setFailure(error); + connectionError(error); + } + }); + } catch (Exception e) { + connectionError(e); + } + } + + private void connectionError(Throwable error) { + LOG.error("Failed to connect to context.", error); + try { + stop(false); + } catch (Exception e) { /* stop() itself prints warning. */ } + } + + private <T> io.netty.util.concurrent.Future<T> deferredCall(final Object msg, + final Class<T> retType) { + if (driverRpc.isSuccess()) { + try { + return driverRpc.get().call(msg, retType); + } catch (Exception ie) { + throw Utils.propagate(ie); + } + } + + // No driver RPC yet, so install a listener and return a promise that will be ready when + // the driver is up and the message is actually delivered. + final Promise<T> promise = eventLoopGroup.next().newPromise(); + final FutureListener<T> callListener = new FutureListener<T>() { + @Override + public void onSuccess(T value) throws Exception { + promise.setSuccess(value); + } + + @Override + public void onFailure(Throwable error) throws Exception { + promise.setFailure(error); + } + }; + + Utils.addListener(driverRpc, new FutureListener<Rpc>() { + @Override + public void onSuccess(Rpc rpc) throws Exception { + Utils.addListener(rpc.call(msg, retType), callListener); + } + + @Override + public void onFailure(Throwable error) throws Exception { + promise.setFailure(error); + } + }); + return promise; + } + + public Future<URI> getServerUri() { + return serverUriPromise; + } + + @Override + public <T> JobHandle<T> submit(Job<T> job) { + return protocol.submit(job); + } + + @Override + public <T> Future<T> run(Job<T> job) { + return protocol.run(job); + } + + @Override + public synchronized void stop(boolean shutdownContext) { + if (isAlive) { + isAlive = false; + try { + this.contextInfoPromise.cancel(true); + + if (shutdownContext && driverRpc.isSuccess()) { + protocol.endSession(); + + // Because the remote context won't really reply to the end session message - + // since it closes the channel while handling it, we wait for the RPC's channel + // to close instead. + long stopTimeout = conf.getTimeAsMs(CLIENT_SHUTDOWN_TIMEOUT); + driverRpc.get().getChannel().closeFuture().get(stopTimeout, + TimeUnit.MILLISECONDS); + } + } catch (Exception e) { + LOG.warn("Exception while waiting for end session reply.", e); + Utils.propagate(e); + } finally { + if (driverRpc.isSuccess()) { + try { + driverRpc.get().close(); + } catch (Exception e) { + LOG.warn("Error stopping RPC.", e); + } + } + + // Report failure for all pending jobs, so that clients can react. + for (Map.Entry<String, JobHandleImpl<?>> e : jobs.entrySet()) { + LOG.info("Failing pending job {} due to shutdown.", e.getKey()); + e.getValue().setFailure(new IOException("RSCClient instance stopped.")); + } + + eventLoopGroup.shutdownGracefully(); + } + if (contextInfo != null) { + LOG.debug("Disconnected from context {}, shutdown = {}.", contextInfo.clientId, + shutdownContext); + } + } + } + + @Override + public Future<?> uploadJar(File jar) { + throw new UnsupportedOperationException("Use addJar to add the jar to the remote context!"); + } + + @Override + public Future<?> addJar(URI uri) { + return submit(new AddJarJob(uri.toString())); + } + + @Override + public Future<?> uploadFile(File file) { + throw new UnsupportedOperationException("Use addFile to add the file to the remote context!"); + } + + @Override + public Future<?> addFile(URI uri) { + return submit(new AddFileJob(uri.toString())); + } + + public String bypass(ByteBuffer serializedJob, boolean sync) { + return protocol.bypass(serializedJob, sync); + } + + public Future<BypassJobStatus> getBypassJobStatus(String id) { + return protocol.getBypassJobStatus(id); + } + + public void cancel(String jobId) { + protocol.cancel(jobId); + } + + ContextInfo getContextInfo() { + return contextInfo; + } + + public Future<Integer> submitReplCode(String code) throws Exception { + return deferredCall(new BaseProtocol.ReplJobRequest(code), Integer.class); + } + + public void cancelReplCode(int statementId) throws Exception { + deferredCall(new BaseProtocol.CancelReplJobRequest(statementId), Void.class); + } + + public Future<ReplJobResults> getReplJobResults(Integer from, Integer size) throws Exception { + return deferredCall(new BaseProtocol.GetReplJobResults(from, size), ReplJobResults.class); + } + + public Future<ReplJobResults> getReplJobResults() throws Exception { + return deferredCall(new BaseProtocol.GetReplJobResults(), ReplJobResults.class); + } + + /** + * @return Return the repl state. If this's not connected to a repl session, it will return null. + */ + public String getReplState() { + return replState; + } + + private class ClientProtocol extends BaseProtocol { + + <T> JobHandleImpl<T> submit(Job<T> job) { + final String jobId = UUID.randomUUID().toString(); + Object msg = new JobRequest<T>(jobId, job); + + final Promise<T> promise = eventLoopGroup.next().newPromise(); + final JobHandleImpl<T> handle = new JobHandleImpl<T>(RSCClient.this, + promise, jobId); + jobs.put(jobId, handle); + + final io.netty.util.concurrent.Future<Void> rpc = deferredCall(msg, Void.class); + LOG.debug("Sending JobRequest[{}].", jobId); + + Utils.addListener(rpc, new FutureListener<Void>() { + @Override + public void onSuccess(Void unused) throws Exception { + handle.changeState(JobHandle.State.QUEUED); + } + + @Override + public void onFailure(Throwable error) throws Exception { + error.printStackTrace(); + promise.tryFailure(error); + } + }); + promise.addListener(new GenericFutureListener<Promise<T>>() { + @Override + public void operationComplete(Promise<T> p) { + if (jobId != null) { + jobs.remove(jobId); + } + if (p.isCancelled() && !rpc.isDone()) { + rpc.cancel(true); + } + } + }); + return handle; + } + + @SuppressWarnings("unchecked") + <T> Future<T> run(Job<T> job) { + return (Future<T>) deferredCall(new SyncJobRequest(job), Object.class); + } + + String bypass(ByteBuffer serializedJob, boolean sync) { + String jobId = UUID.randomUUID().toString(); + Object msg = new BypassJobRequest(jobId, BufferUtils.toByteArray(serializedJob), sync); + deferredCall(msg, Void.class); + return jobId; + } + + Future<BypassJobStatus> getBypassJobStatus(String id) { + return deferredCall(new GetBypassJobStatus(id), BypassJobStatus.class); + } + + void cancel(String jobId) { + deferredCall(new CancelJob(jobId), Void.class); + } + + Future<?> endSession() { + return deferredCall(new EndSession(), Void.class); + } + + private void handle(ChannelHandlerContext ctx, InitializationError msg) { + LOG.warn("Error reported from remote driver: %s", msg.stackTrace); + } + + private void handle(ChannelHandlerContext ctx, JobResult msg) { + JobHandleImpl<?> handle = jobs.remove(msg.id); + if (handle != null) { + LOG.info("Received result for {}", msg.id); + // TODO: need a better exception for this. + Throwable error = msg.error != null ? new RuntimeException(msg.error) : null; + if (error == null) { + handle.setSuccess(msg.result); + } else { + handle.setFailure(error); + } + } else { + LOG.warn("Received result for unknown job {}", msg.id); + } + } + + private void handle(ChannelHandlerContext ctx, JobStarted msg) { + JobHandleImpl<?> handle = jobs.get(msg.id); + if (handle != null) { + handle.changeState(JobHandle.State.STARTED); + } else { + LOG.warn("Received event for unknown job {}", msg.id); + } + } + + private void handle(ChannelHandlerContext ctx, ReplState msg) { + LOG.trace("Received repl state for {}", msg.state); + replState = msg.state; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java ---------------------------------------------------------------------- diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java b/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java new file mode 100644 index 0000000..c6327e2 --- /dev/null +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.rsc; + +import java.io.IOException; +import java.net.URI; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import io.netty.util.concurrent.ImmediateEventExecutor; +import io.netty.util.concurrent.Promise; + +import org.apache.livy.LivyClient; +import org.apache.livy.LivyClientFactory; +import org.apache.livy.rsc.rpc.RpcServer; + +/** + * Factory for RSC clients. + */ +public final class RSCClientFactory implements LivyClientFactory { + + private final AtomicInteger refCount = new AtomicInteger(); + private RpcServer server = null; + + /** + * Creates a local Livy client if the URI has the "rsc" scheme. + * <p> + * If the URI contains user information, host and port, the library will try to connect to an + * existing RSC instance with the provided information, and most of the provided configuration + * will be ignored. + * <p> + * Otherwise, a new Spark context will be started with the given configuration. + */ + @Override + public LivyClient createClient(URI uri, Properties config) { + if (!"rsc".equals(uri.getScheme())) { + return null; + } + + RSCConf lconf = new RSCConf(config); + + boolean needsServer = false; + try { + Promise<ContextInfo> info; + Process driverProcess = null; + if (uri.getUserInfo() != null && uri.getHost() != null && uri.getPort() > 0) { + info = createContextInfo(uri); + } else { + needsServer = true; + ref(lconf); + DriverProcessInfo processInfo = ContextLauncher.create(this, lconf); + info = processInfo.getContextInfo(); + driverProcess = processInfo.getDriverProcess(); + } + return new RSCClient(lconf, info, driverProcess); + } catch (Exception e) { + if (needsServer) { + unref(); + } + throw Utils.propagate(e); + } + } + + RpcServer getServer() { + return server; + } + + private synchronized void ref(RSCConf config) throws IOException { + if (refCount.get() != 0) { + refCount.incrementAndGet(); + return; + } + + Utils.checkState(server == null, "Server already running but ref count is 0."); + if (server == null) { + try { + server = new RpcServer(config); + } catch (InterruptedException ie) { + throw Utils.propagate(ie); + } + } + + refCount.incrementAndGet(); + } + + synchronized void unref() { + if (refCount.decrementAndGet() == 0) { + server.close(); + server = null; + } + } + + private static Promise<ContextInfo> createContextInfo(final URI uri) { + String[] userInfo = uri.getUserInfo().split(":", 2); + ImmediateEventExecutor executor = ImmediateEventExecutor.INSTANCE; + Promise<ContextInfo> promise = executor.newPromise(); + promise.setSuccess(new ContextInfo(uri.getHost(), uri.getPort(), userInfo[0], userInfo[1])); + return promise; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java ---------------------------------------------------------------------- diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java new file mode 100644 index 0000000..c560aed --- /dev/null +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.rsc; + +import java.io.IOException; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import javax.security.sasl.Sasl; + +import org.apache.livy.client.common.ClientConf; + +public class RSCConf extends ClientConf<RSCConf> { + + public static final String SPARK_CONF_PREFIX = "spark."; + public static final String LIVY_SPARK_PREFIX = SPARK_CONF_PREFIX + "__livy__."; + + private static final String RSC_CONF_PREFIX = "livy.rsc."; + + public static enum Entry implements ConfEntry { + CLIENT_ID("client.auth.id", null), + CLIENT_SECRET("client.auth.secret", null), + CLIENT_IN_PROCESS("client.do-not-use.run-driver-in-process", false), + CLIENT_SHUTDOWN_TIMEOUT("client.shutdown-timeout", "10s"), + DRIVER_CLASS("driver-class", null), + SESSION_KIND("session.kind", null), + + LIVY_JARS("jars", null), + SPARKR_PACKAGE("sparkr.package", null), + PYSPARK_ARCHIVES("pyspark.archives", null), + + // Address for the RSC driver to connect back with it's connection info. + LAUNCHER_ADDRESS("launcher.address", null), + LAUNCHER_PORT_RANGE("launcher.port.range", "10000~10010"), + // Setting up of this propety by user has no benefit. It is currently being used + // to pass port information from ContextLauncher to RSCDriver + LAUNCHER_PORT("launcher.port", -1), + // How long will the RSC wait for a connection for a Livy server before shutting itself down. + SERVER_IDLE_TIMEOUT("server.idle-timeout", "10m"), + + PROXY_USER("proxy-user", null), + + RPC_SERVER_ADDRESS("rpc.server.address", null), + RPC_CLIENT_HANDSHAKE_TIMEOUT("server.connect.timeout", "90s"), + RPC_CLIENT_CONNECT_TIMEOUT("client.connect.timeout", "10s"), + RPC_CHANNEL_LOG_LEVEL("channel.log.level", null), + RPC_MAX_MESSAGE_SIZE("rpc.max.size", 50 * 1024 * 1024), + RPC_MAX_THREADS("rpc.threads", 8), + RPC_SECRET_RANDOM_BITS("secret.bits", 256), + + SASL_MECHANISMS("rpc.sasl.mechanisms", "DIGEST-MD5"), + SASL_QOP("rpc.sasl.qop", null), + + TEST_STUCK_END_SESSION("test.do-not-use.stuck-end-session", false), + TEST_STUCK_START_DRIVER("test.do-not-use.stuck-start-driver", false), + + JOB_CANCEL_TRIGGER_INTERVAL("job-cancel.trigger-interval", "100ms"), + JOB_CANCEL_TIMEOUT("job-cancel.timeout", "30s"), + + RETAINED_STATEMENT_NUMBER("retained-statements", 100); + + private final String key; + private final Object dflt; + + private Entry(String key, Object dflt) { + this.key = RSC_CONF_PREFIX + key; + this.dflt = dflt; + } + + @Override + public String key() { return key; } + + @Override + public Object dflt() { return dflt; } + } + + public RSCConf() { + this(new Properties()); + } + + public RSCConf(Properties config) { + super(config); + } + + public Map<String, String> getSaslOptions() { + Map<String, String> opts = new HashMap<>(); + + // TODO: add more options? + String qop = get(Entry.SASL_QOP); + if (qop != null) { + opts.put(Sasl.QOP, qop); + } + + return opts; + } + + public String findLocalAddress() throws IOException { + InetAddress address = InetAddress.getLocalHost(); + if (address.isLoopbackAddress()) { + // Address resolves to something like 127.0.1.1, which happens on Debian; + // try to find a better address using the local network interfaces + Enumeration<NetworkInterface> ifaces = NetworkInterface.getNetworkInterfaces(); + while (ifaces.hasMoreElements()) { + NetworkInterface ni = ifaces.nextElement(); + Enumeration<InetAddress> addrs = ni.getInetAddresses(); + while (addrs.hasMoreElements()) { + InetAddress addr = addrs.nextElement(); + if (!addr.isLinkLocalAddress() && !addr.isLoopbackAddress() + && addr instanceof Inet4Address) { + // We've found an address that looks reasonable! + LOG.warn("Your hostname, {}, resolves to a loopback address; using {} " + + " instead (on interface {})", address.getHostName(), addr.getHostAddress(), + ni.getName()); + LOG.warn("Set '{}' if you need to bind to another address.", + Entry.RPC_SERVER_ADDRESS.key); + return addr.getHostAddress(); + } + } + } + } + + LOG.warn("Your hostname, {}, resolves to a loopback address, but we couldn't find " + + "any external IP address!", address.getCanonicalHostName()); + LOG.warn("Set {} if you need to bind to another address.", + Entry.RPC_SERVER_ADDRESS.key); + return address.getCanonicalHostName(); + } + + private static final Map<String, DeprecatedConf> configsWithAlternatives + = Collections.unmodifiableMap(new HashMap<String, DeprecatedConf>() {{ + put(RSCConf.Entry.CLIENT_IN_PROCESS.key, DepConf.CLIENT_IN_PROCESS); + put(RSCConf.Entry.CLIENT_SHUTDOWN_TIMEOUT.key, DepConf.CLIENT_SHUTDOWN_TIMEOUT); + put(RSCConf.Entry.DRIVER_CLASS.key, DepConf.DRIVER_CLASS); + put(RSCConf.Entry.SERVER_IDLE_TIMEOUT.key, DepConf.SERVER_IDLE_TIMEOUT); + put(RSCConf.Entry.PROXY_USER.key, DepConf.PROXY_USER); + put(RSCConf.Entry.TEST_STUCK_END_SESSION.key, DepConf.TEST_STUCK_END_SESSION); + put(RSCConf.Entry.TEST_STUCK_START_DRIVER.key, DepConf.TEST_STUCK_START_DRIVER); + put(RSCConf.Entry.JOB_CANCEL_TRIGGER_INTERVAL.key, DepConf.JOB_CANCEL_TRIGGER_INTERVAL); + put(RSCConf.Entry.JOB_CANCEL_TIMEOUT.key, DepConf.JOB_CANCEL_TIMEOUT); + put(RSCConf.Entry.RETAINED_STATEMENT_NUMBER.key, DepConf.RETAINED_STATEMENT_NUMBER); + }}); + + // Maps deprecated key to DeprecatedConf with the same key. + // There are no deprecated configs without alternatives currently. + private static final Map<String, DeprecatedConf> deprecatedConfigs + = Collections.unmodifiableMap(new HashMap<String, DeprecatedConf>()); + + protected Map<String, DeprecatedConf> getConfigsWithAlternatives() { + return configsWithAlternatives; + } + + protected Map<String, DeprecatedConf> getDeprecatedConfigs() { + return deprecatedConfigs; + } + + static enum DepConf implements DeprecatedConf { + CLIENT_IN_PROCESS("client.do_not_use.run_driver_in_process", "0.4"), + CLIENT_SHUTDOWN_TIMEOUT("client.shutdown_timeout", "0.4"), + DRIVER_CLASS("driver_class", "0.4"), + SERVER_IDLE_TIMEOUT("server.idle_timeout", "0.4"), + PROXY_USER("proxy_user", "0.4"), + TEST_STUCK_END_SESSION("test.do_not_use.stuck_end_session", "0.4"), + TEST_STUCK_START_DRIVER("test.do_not_use.stuck_start_driver", "0.4"), + JOB_CANCEL_TRIGGER_INTERVAL("job_cancel.trigger_interval", "0.4"), + JOB_CANCEL_TIMEOUT("job_cancel.timeout", "0.4"), + RETAINED_STATEMENT_NUMBER("retained_statements", "0.4"); + + private final String key; + private final String version; + private final String deprecationMessage; + + private DepConf(String key, String version) { + this(key, version, ""); + } + + private DepConf(String key, String version, String deprecationMessage) { + this.key = RSC_CONF_PREFIX + key; + this.version = version; + this.deprecationMessage = deprecationMessage; + } + + @Override + public String key() { return key; } + + @Override + public String version() { return version; } + + @Override + public String deprecationMessage() { return deprecationMessage; } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/ReplJobResults.java ---------------------------------------------------------------------- diff --git a/rsc/src/main/java/org/apache/livy/rsc/ReplJobResults.java b/rsc/src/main/java/org/apache/livy/rsc/ReplJobResults.java new file mode 100644 index 0000000..6717c59 --- /dev/null +++ b/rsc/src/main/java/org/apache/livy/rsc/ReplJobResults.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.rsc; + +import org.apache.livy.rsc.driver.Statement; + +public class ReplJobResults { + public final Statement[] statements; + + public ReplJobResults(Statement[] statements) { + this.statements = statements; + } + + public ReplJobResults() { + this(null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/Utils.java ---------------------------------------------------------------------- diff --git a/rsc/src/main/java/org/apache/livy/rsc/Utils.java b/rsc/src/main/java/org/apache/livy/rsc/Utils.java new file mode 100644 index 0000000..d2c0059 --- /dev/null +++ b/rsc/src/main/java/org/apache/livy/rsc/Utils.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.rsc; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; + +/** + * A few simple utility functions used by the code, mostly to avoid a direct dependency + * on Guava. + */ +public class Utils { + + public static void checkArgument(boolean condition) { + if (!condition) { + throw new IllegalArgumentException(); + } + } + + public static void checkArgument(boolean condition, String msg, Object... args) { + if (!condition) { + throw new IllegalArgumentException(String.format(msg, args)); + } + } + + public static void checkState(boolean condition, String msg, Object... args) { + if (!condition) { + throw new IllegalStateException(String.format(msg, args)); + } + } + + public static void checkNotNull(Object o) { + if (o == null) { + throw new NullPointerException(); + } + } + + public static RuntimeException propagate(Throwable t) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else { + throw new RuntimeException(t); + } + } + + public static ThreadFactory newDaemonThreadFactory(final String nameFormat) { + return new ThreadFactory() { + + private final AtomicInteger threadId = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName(String.format(nameFormat, threadId.incrementAndGet())); + t.setDaemon(true); + return t; + } + + }; + } + + public static String join(Iterable<String> strs, String sep) { + StringBuilder sb = new StringBuilder(); + for (String s : strs) { + if (s != null && !s.isEmpty()) { + sb.append(s).append(sep); + } + } + if (sb.length() > 0) { + sb.setLength(sb.length() - sep.length()); + } + return sb.toString(); + } + + public static String stackTraceAsString(Throwable t) { + StringBuilder sb = new StringBuilder(); + sb.append(t.getClass().getName()).append(": ").append(t.getMessage()); + for (StackTraceElement e : t.getStackTrace()) { + sb.append("\n"); + sb.append(e.toString()); + } + return sb.toString(); + } + + public static <T> void addListener(Future<T> future, final FutureListener<T> lsnr) { + future.addListener(new GenericFutureListener<Future<T>>() { + @Override + public void operationComplete(Future<T> f) throws Exception { + if (f.isSuccess()) { + lsnr.onSuccess(f.get()); + } else { + lsnr.onFailure(f.cause()); + } + } + }); + } + + private Utils() { } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/driver/AddFileJob.java ---------------------------------------------------------------------- diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/AddFileJob.java b/rsc/src/main/java/org/apache/livy/rsc/driver/AddFileJob.java new file mode 100644 index 0000000..cc75b6c --- /dev/null +++ b/rsc/src/main/java/org/apache/livy/rsc/driver/AddFileJob.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.rsc.driver; + +import org.apache.livy.Job; +import org.apache.livy.JobContext; + +public class AddFileJob implements Job<Object> { + + private final String path; + + AddFileJob() { + this(null); +} + + public AddFileJob(String path) { + this.path = path; +} + + @Override + public Object call(JobContext jc) throws Exception { + JobContextImpl jobContextImpl = (JobContextImpl)jc; + jobContextImpl.addFile(path); + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/driver/AddJarJob.java ---------------------------------------------------------------------- diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/AddJarJob.java b/rsc/src/main/java/org/apache/livy/rsc/driver/AddJarJob.java new file mode 100644 index 0000000..c455e6e --- /dev/null +++ b/rsc/src/main/java/org/apache/livy/rsc/driver/AddJarJob.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.rsc.driver; + +import org.apache.livy.Job; +import org.apache.livy.JobContext; + +public class AddJarJob implements Job<Object> { + + private final String path; + + // For serialization. + private AddJarJob() { + this(null); + } + + public AddJarJob(String path) { + this.path = path; + } + + @Override + public Object call(JobContext jc) throws Exception { + JobContextImpl jobContextImpl = (JobContextImpl)jc; + jobContextImpl.addJarOrPyFile(path); + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/driver/BypassJob.java ---------------------------------------------------------------------- diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/BypassJob.java b/rsc/src/main/java/org/apache/livy/rsc/driver/BypassJob.java new file mode 100644 index 0000000..f0d14c6 --- /dev/null +++ b/rsc/src/main/java/org/apache/livy/rsc/driver/BypassJob.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.rsc.driver; + +import java.nio.ByteBuffer; + +import org.apache.livy.Job; +import org.apache.livy.JobContext; +import org.apache.livy.client.common.BufferUtils; +import org.apache.livy.client.common.Serializer; + +class BypassJob implements Job<byte[]> { + + private final Serializer serializer; + private final byte[] serializedJob; + + BypassJob(Serializer serializer, byte[] serializedJob) { + this.serializer = serializer; + this.serializedJob = serializedJob; + } + + @Override + public byte[] call(JobContext jc) throws Exception { + Job<?> job = (Job<?>) serializer.deserialize(ByteBuffer.wrap(serializedJob)); + Object result = job.call(jc); + byte[] serializedResult; + if (result != null) { + ByteBuffer data = serializer.serialize(result); + serializedResult = BufferUtils.toByteArray(data); + } else { + serializedResult = null; + } + return serializedResult; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/driver/BypassJobWrapper.java ---------------------------------------------------------------------- diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/BypassJobWrapper.java b/rsc/src/main/java/org/apache/livy/rsc/driver/BypassJobWrapper.java new file mode 100644 index 0000000..1fa5bf1 --- /dev/null +++ b/rsc/src/main/java/org/apache/livy/rsc/driver/BypassJobWrapper.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.rsc.driver; + +import java.util.List; + +import org.apache.livy.Job; +import org.apache.livy.JobHandle; +import org.apache.livy.rsc.BypassJobStatus; +import org.apache.livy.rsc.Utils; + +public class BypassJobWrapper extends JobWrapper<byte[]> { + + private volatile byte[] result; + private volatile Throwable error; + private volatile JobHandle.State state; + private volatile List<Integer> newSparkJobs; + + public BypassJobWrapper(RSCDriver driver, String jobId, Job<byte[]> serializedJob) { + super(driver, jobId, serializedJob); + state = JobHandle.State.QUEUED; + } + + @Override + public Void call() throws Exception { + state = JobHandle.State.STARTED; + return super.call(); + } + + @Override + protected synchronized void finished(byte[] result, Throwable error) { + if (error == null) { + this.result = result; + this.state = JobHandle.State.SUCCEEDED; + } else { + this.error = error; + this.state = JobHandle.State.FAILED; + } + } + + @Override + boolean cancel() { + if (super.cancel()) { + this.state = JobHandle.State.CANCELLED; + return true; + } + return false; + } + + @Override + protected void jobStarted() { + // Do nothing; just avoid sending data back to the driver. + } + + synchronized BypassJobStatus getStatus() { + String stackTrace = error != null ? Utils.stackTraceAsString(error) : null; + return new BypassJobStatus(state, result, stackTrace); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/driver/JobContextImpl.java ---------------------------------------------------------------------- diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/JobContextImpl.java b/rsc/src/main/java/org/apache/livy/rsc/driver/JobContextImpl.java new file mode 100644 index 0000000..ddb5713 --- /dev/null +++ b/rsc/src/main/java/org/apache/livy/rsc/driver/JobContextImpl.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.rsc.driver; + +import java.io.File; +import java.lang.reflect.Method; + +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaFutureAction; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.hive.HiveContext; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.livy.JobContext; +import org.apache.livy.rsc.Utils; + +class JobContextImpl implements JobContext { + + private static final Logger LOG = LoggerFactory.getLogger(JobContextImpl.class); + + private final JavaSparkContext sc; + private final File localTmpDir; + private volatile SQLContext sqlctx; + private volatile HiveContext hivectx; + private volatile JavaStreamingContext streamingctx; + private final RSCDriver driver; + private volatile Object sparksession; + + public JobContextImpl(JavaSparkContext sc, File localTmpDir, RSCDriver driver) { + this.sc = sc; + this.localTmpDir = localTmpDir; + this.driver = driver; + } + + @Override + public JavaSparkContext sc() { + return sc; + } + + @Override + public Object sparkSession() throws Exception { + if (sparksession == null) { + synchronized (this) { + if (sparksession == null) { + try { + Class<?> clz = Class.forName("org.apache.spark.sql.SparkSession$"); + Object spark = clz.getField("MODULE$").get(null); + Method m = clz.getMethod("builder"); + Object builder = m.invoke(spark); + builder.getClass().getMethod("sparkContext", SparkContext.class) + .invoke(builder, sc.sc()); + sparksession = builder.getClass().getMethod("getOrCreate").invoke(builder); + } catch (Exception e) { + LOG.warn("SparkSession is not supported", e); + throw e; + } + } + } + } + + return sparksession; + } + + @Override + public SQLContext sqlctx() { + if (sqlctx == null) { + synchronized (this) { + if (sqlctx == null) { + sqlctx = new SQLContext(sc); + } + } + } + return sqlctx; + } + + @Override + public HiveContext hivectx() { + if (hivectx == null) { + synchronized (this) { + if (hivectx == null) { + hivectx = new HiveContext(sc.sc()); + } + } + } + return hivectx; + } + + @Override + public synchronized JavaStreamingContext streamingctx(){ + Utils.checkState(streamingctx != null, "method createStreamingContext must be called first."); + return streamingctx; + } + + @Override + public synchronized void createStreamingContext(long batchDuration) { + Utils.checkState(streamingctx == null, "Streaming context is not null."); + streamingctx = new JavaStreamingContext(sc, new Duration(batchDuration)); + } + + @Override + public synchronized void stopStreamingCtx() { + Utils.checkState(streamingctx != null, "Streaming Context is null"); + streamingctx.stop(); + streamingctx = null; + } + + @Override + public File getLocalTmpDir() { + return localTmpDir; + } + + public synchronized void stop() { + if (streamingctx != null) { + stopStreamingCtx(); + } + if (sc != null) { + sc.stop(); + } + } + + public void addFile(String path) { + driver.addFile(path); + } + + public void addJarOrPyFile(String path) throws Exception { + driver.addJarOrPyFile(path); + } +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/driver/JobWrapper.java ---------------------------------------------------------------------- diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/JobWrapper.java b/rsc/src/main/java/org/apache/livy/rsc/driver/JobWrapper.java new file mode 100644 index 0000000..f6df164 --- /dev/null +++ b/rsc/src/main/java/org/apache/livy/rsc/driver/JobWrapper.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.rsc.driver; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.spark.api.java.JavaFutureAction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.livy.Job; + +public class JobWrapper<T> implements Callable<Void> { + + private static final Logger LOG = LoggerFactory.getLogger(JobWrapper.class); + + public final String jobId; + + private final RSCDriver driver; + private final Job<T> job; + private final AtomicInteger completed; + + private Future<?> future; + + public JobWrapper(RSCDriver driver, String jobId, Job<T> job) { + this.driver = driver; + this.jobId = jobId; + this.job = job; + this.completed = new AtomicInteger(); + } + + @Override + public Void call() throws Exception { + try { + jobStarted(); + T result = job.call(driver.jobContext()); + finished(result, null); + } catch (Throwable t) { + // Catch throwables in a best-effort to report job status back to the client. It's + // re-thrown so that the executor can destroy the affected thread (or the JVM can + // die or whatever would happen if the throwable bubbled up). + LOG.info("Failed to run job " + jobId, t); + finished(null, t); + throw new ExecutionException(t); + } finally { + driver.activeJobs.remove(jobId); + } + return null; + } + + void submit(ExecutorService executor) { + this.future = executor.submit(this); + } + + void jobDone() { + synchronized (completed) { + completed.incrementAndGet(); + completed.notifyAll(); + } + } + + boolean cancel() { + return future != null ? future.cancel(true) : true; + } + + protected void finished(T result, Throwable error) { + if (error == null) { + driver.jobFinished(jobId, result, null); + } else { + driver.jobFinished(jobId, null, error); + } + } + + protected void jobStarted() { + driver.jobStarted(jobId); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/rsc/src/main/java/org/apache/livy/rsc/driver/MutableClassLoader.java ---------------------------------------------------------------------- diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/MutableClassLoader.java b/rsc/src/main/java/org/apache/livy/rsc/driver/MutableClassLoader.java new file mode 100644 index 0000000..30da79e --- /dev/null +++ b/rsc/src/main/java/org/apache/livy/rsc/driver/MutableClassLoader.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.rsc.driver; + +import java.net.URL; +import java.net.URLClassLoader; + +class MutableClassLoader extends URLClassLoader { + + MutableClassLoader(ClassLoader parent) { + super(new URL[] { }, parent); + } + + @Override + public void addURL(URL url) { + super.addURL(url); + } + +}