This is an automated email from the ASF dual-hosted git repository.
lmccay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push:
new 674ac00a LIVY-1030 cleanup of context and clients (#506)
(edwardcapriolo via lmccay)
674ac00a is described below
commit 674ac00ab28c4a259cb5e9b3ccf9afe706b9aad1
Author: Edward Capriolo <[email protected]>
AuthorDate: Wed Jan 21 14:29:41 2026 -0500
LIVY-1030 cleanup of context and clients (#506) (edwardcapriolo via lmccay)
---
.../java/org/apache/livy/LivyClientBuilder.java | 29 +++----
.../java/org/apache/livy/LivyClientFactory.java | 6 +-
.../java/org/apache/livy/TestClientFactory.java | 11 ++-
.../apache/livy/client/http/HttpClientFactory.java | 8 +-
.../java/org/apache/livy/rsc/ContextLauncher.java | 88 +++++++++-------------
.../java/org/apache/livy/rsc/RSCClientFactory.java | 10 +--
rsc/src/main/java/org/apache/livy/rsc/RSCConf.java | 10 +--
7 files changed, 74 insertions(+), 88 deletions(-)
diff --git a/api/src/main/java/org/apache/livy/LivyClientBuilder.java
b/api/src/main/java/org/apache/livy/LivyClientBuilder.java
index 1d7ec018..d736648c 100644
--- a/api/src/main/java/org/apache/livy/LivyClientBuilder.java
+++ b/api/src/main/java/org/apache/livy/LivyClientBuilder.java
@@ -23,11 +23,8 @@ import java.io.Reader;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.ServiceLoader;
+import java.util.*;
+
import static java.nio.charset.StandardCharsets.UTF_8;
/**
@@ -134,20 +131,24 @@ public final class LivyClientBuilder {
LivyClient client = null;
if (CLIENT_FACTORIES.isEmpty()) {
- throw new IllegalStateException("No LivyClientFactory implementation was
found.");
+ throw new IllegalStateException("No LivyClientFactory implementations
were found.");
}
for (LivyClientFactory factory : CLIENT_FACTORIES) {
try {
- client = factory.createClient(uri, config);
- } catch (Exception e) {
- if (!(e instanceof RuntimeException)) {
- e = new RuntimeException(e);
+ Optional<LivyClient> found = factory.createClient(uri, config);
+ if (found.isPresent()){
+ client = found.get();
+ break;
+ }
+ } catch (Exception e){
+ //Note: the compiler identifies this as impossible. The initial author
might
+ // believe the factories sneaky throw other exceptions
+ if (e instanceof RuntimeException){
+ throw e;
+ } else {
+ throw new RuntimeException(e);
}
- throw (RuntimeException) e;
- }
- if (client != null) {
- break;
}
}
diff --git a/api/src/main/java/org/apache/livy/LivyClientFactory.java
b/api/src/main/java/org/apache/livy/LivyClientFactory.java
index 1b9c72b6..975ec9d0 100644
--- a/api/src/main/java/org/apache/livy/LivyClientFactory.java
+++ b/api/src/main/java/org/apache/livy/LivyClientFactory.java
@@ -18,6 +18,7 @@
package org.apache.livy;
import java.net.URI;
+import java.util.Optional;
import java.util.Properties;
import org.apache.livy.annotations.Private;
@@ -39,8 +40,9 @@ public interface LivyClientFactory {
*
* @param uri URI pointing at the livy backend to use.
* @param config Livy client configs.
- * @return The newly created LivyClient or null if an unsupported URI
+ * @return Some if the factory understands the URI
+ * @throws RuntimeException if they supports the scheme but fail to create a
client
*/
- LivyClient createClient(URI uri, Properties config);
+ Optional<LivyClient> createClient(URI uri, Properties config);
}
diff --git a/api/src/test/java/org/apache/livy/TestClientFactory.java
b/api/src/test/java/org/apache/livy/TestClientFactory.java
index 622908c0..a4a4413a 100644
--- a/api/src/test/java/org/apache/livy/TestClientFactory.java
+++ b/api/src/test/java/org/apache/livy/TestClientFactory.java
@@ -19,13 +19,14 @@ package org.apache.livy;
import java.io.File;
import java.net.URI;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
public class TestClientFactory implements LivyClientFactory {
- private static AtomicLong instanceCount = new AtomicLong();
+ private static final AtomicLong instanceCount = new AtomicLong();
public static long getInstanceCount() {
return instanceCount.get();
}
@@ -35,16 +36,14 @@ public class TestClientFactory implements LivyClientFactory
{
}
@Override
- public LivyClient createClient(URI uri, Properties config) {
+ public Optional<LivyClient> createClient(URI uri, Properties config) {
switch (uri.getPath()) {
case "match":
- return new Client(config);
-
+ return Optional.of(new Client(config));
case "error":
throw new IllegalStateException("error");
-
default:
- return null;
+ return Optional.empty();
}
}
diff --git
a/client-http/src/main/java/org/apache/livy/client/http/HttpClientFactory.java
b/client-http/src/main/java/org/apache/livy/client/http/HttpClientFactory.java
index 622a74e5..873ea321 100644
---
a/client-http/src/main/java/org/apache/livy/client/http/HttpClientFactory.java
+++
b/client-http/src/main/java/org/apache/livy/client/http/HttpClientFactory.java
@@ -18,6 +18,7 @@
package org.apache.livy.client.http;
import java.net.URI;
+import java.util.Optional;
import java.util.Properties;
import org.apache.livy.LivyClient;
@@ -29,12 +30,11 @@ import org.apache.livy.LivyClientFactory;
public final class HttpClientFactory implements LivyClientFactory {
@Override
- public LivyClient createClient(URI uri, Properties config) {
+ public Optional<LivyClient> createClient(URI uri, Properties config) {
if (!"http".equals(uri.getScheme()) && !"https".equals(uri.getScheme())) {
- return null;
+ return Optional.empty();
}
-
- return new HttpClient(uri, new HttpConf(config));
+ return Optional.of(new HttpClient(uri, new HttpConf(config)));
}
}
diff --git a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
index c59136d5..8c449490 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
@@ -17,12 +17,9 @@
package org.apache.livy.rsc;
-import java.io.BufferedReader;
import java.io.File;
-import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.Reader;
@@ -66,7 +63,6 @@ class ContextLauncher {
private static final Logger LOG =
LoggerFactory.getLogger(ContextLauncher.class);
private static final AtomicInteger CHILD_IDS = new AtomicInteger();
- private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode";
private static final String SPARK_JARS_KEY = "spark.jars";
private static final String SPARK_ARCHIVES_KEY = "spark.yarn.dist.archives";
private static final String SPARK_HOME_ENV = "SPARK_HOME";
@@ -95,9 +91,6 @@ class ContextLauncher {
final RegistrationHandler handler = new RegistrationHandler();
try {
factory.getServer().registerClient(clientId, secret, handler);
- String replMode = conf.get("repl");
- boolean repl = replMode != null && replMode.equals("true");
-
// In some scenarios the user may need to configure this endpoint
setting explicitly.
String address = conf.get(LAUNCHER_ADDRESS);
// If not specified, use the RPC server address; otherwise use the
specified address.
@@ -112,7 +105,7 @@ class ContextLauncher {
Utils.addListener(promise, new FutureListener<ContextInfo>() {
@Override
public void onFailure(Throwable error) throws Exception {
- // If promise is cancelled or failed, make sure spark-submit is not
leaked.
+ // If promise is canceled or failed, make sure spark-submit is not
leaked.
if (child != null) {
child.kill();
}
@@ -181,13 +174,16 @@ class ContextLauncher {
}
Utils.checkState(rscJars.isDirectory(),
- "Cannot find rsc jars directory under LIVY_HOME.");
+ "Cannot find rsc jars directory: " + rscJars.getAbsolutePath());
allJars.add(rscJars);
List<String> jars = new ArrayList<>();
for (File dir : allJars) {
- for (File f : dir.listFiles()) {
- jars.add(f.getAbsolutePath());
+ File [] list = dir.listFiles();
+ if (list != null) {
+ for (File f : list) {
+ jars.add(f.getAbsolutePath());
+ }
}
}
livyJars = Utils.join(jars, ",");
@@ -226,14 +222,11 @@ class ContextLauncher {
} else if (conf.getBoolean(CLIENT_IN_PROCESS)) {
// Mostly for testing things quickly. Do not do this in production.
LOG.warn("!!!! Running remote driver in-process. !!!!");
- Runnable child = new Runnable() {
- @Override
- public void run() {
- try {
- RSCDriverBootstrapper.main(new String[] {
confFile.getAbsolutePath() });
- } catch (Exception e) {
- throw Utils.propagate(e);
- }
+ Runnable child = () -> {
+ try {
+ RSCDriverBootstrapper.main(new String[] { confFile.getAbsolutePath()
});
+ } catch (Exception e) {
+ throw Utils.propagate(e);
}
};
return new ChildProcess(conf, promise, child, confFile);
@@ -261,7 +254,7 @@ class ContextLauncher {
* Write the configuration to a file readable only by the process's owner.
Livy properties
* are written with an added prefix so that they can be loaded using
SparkConf on the driver
* side.
- *
+ * <br>
* The default Spark configuration (from either SPARK_HOME or
SPARK_CONF_DIR) is merged into
* the user configuration, so that defaults set by Livy's admin take effect
when not overridden
* by the user.
@@ -286,13 +279,10 @@ class ContextLauncher {
File sparkDefaults = new File(confDir + File.separator +
"spark-defaults.conf");
if (sparkDefaults.isFile()) {
Properties sparkConf = new Properties();
- Reader r = new InputStreamReader(new FileInputStream(sparkDefaults),
UTF_8);
- try {
- sparkConf.load(r);
- } finally {
- r.close();
+ try (Reader r = new InputStreamReader(
+ Files.newInputStream(sparkDefaults.toPath()), UTF_8)) {
+ sparkConf.load(r);
}
-
for (String key : sparkConf.stringPropertyNames()) {
if (!confView.containsKey(key)) {
confView.put(key, sparkConf.getProperty(key));
@@ -303,15 +293,9 @@ class ContextLauncher {
File file = File.createTempFile("livyConf", ".properties");
Files.setPosixFilePermissions(file.toPath(), EnumSet.of(OWNER_READ,
OWNER_WRITE));
- //file.deleteOnExit();
-
- Writer writer = new OutputStreamWriter(new FileOutputStream(file), UTF_8);
- try {
- confView.store(writer, "Livy App Context Configuration");
- } finally {
- writer.close();
+ try (Writer writer = new OutputStreamWriter(new FileOutputStream(file),
UTF_8)) {
+ confView.store(writer, "Livy App Context Configuration");
}
-
return file;
}
@@ -340,14 +324,16 @@ class ContextLauncher {
}
}
- private void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) {
+ //Note. Your compiler or IDE may identify this method as unused
+ //tests fail without it
+ public void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) {
InetSocketAddress insocket = (InetSocketAddress)
ctx.channel().remoteAddress();
String ip = insocket.getAddress().getHostAddress();
ContextInfo info = new ContextInfo(ip, msg.port, clientId, secret);
if (promise.trySuccess(info)) {
timeout.cancel(true);
LOG.debug("Received driver info for client {}: {}/{}.",
client.getChannel(),
- msg.host, msg.port);
+ msg.host, msg.port);
} else {
LOG.warn("Connection established but promise is already finalized.");
}
@@ -398,7 +384,7 @@ class ContextLauncher {
}
} catch (InterruptedException ie) {
LOG.warn("Waiting thread interrupted, killing child process.");
- Thread.interrupted();
+ boolean ignored = Thread.interrupted();
child.destroy();
} catch (Exception e) {
LOG.warn("Exception while waiting for child process.", e);
@@ -436,36 +422,34 @@ class ContextLauncher {
try {
monitor.join(conf.getTimeAsMs(CLIENT_SHUTDOWN_TIMEOUT));
} catch (InterruptedException ie) {
- LOG.debug("Interrupted before driver thread was finished.");
+ LOG.debug("Interrupted before driver thread was finished.", ie);
}
}
private Thread monitor(final Runnable task, int childId) {
- Runnable wrappedTask = new Runnable() {
- @Override
- public void run() {
- try {
- task.run();
- } finally {
- confFile.delete();
- }
+ Runnable wrappedTask = () -> {
+ try {
+ task.run();
+ } finally {
+ boolean ignored = confFile.delete();
}
};
Thread thread = new Thread(wrappedTask);
thread.setDaemon(true);
thread.setName("ContextLauncher-" + childId);
- thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
{
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- LOG.warn("Child task threw exception.", e);
- fail(e);
- }
+ thread.setUncaughtExceptionHandler((t, e) -> {
+ LOG.warn("Child task threw exception.", e);
+ fail(e);
});
thread.start();
return thread;
}
}
+ public RSCConf getConf() {
+ return conf;
+ }
+
// Just for testing.
static Process mockSparkSubmit;
diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java
b/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java
index d9d56fb6..85ebfe0a 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java
@@ -19,6 +19,7 @@ package org.apache.livy.rsc;
import java.io.IOException;
import java.net.URI;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
@@ -37,7 +38,7 @@ public final class RSCClientFactory implements
LivyClientFactory {
private final AtomicInteger refCount = new AtomicInteger();
private RpcServer server = null;
// interactive session child processes number
- private static AtomicInteger iscpn = new AtomicInteger();
+ private static final AtomicInteger iscpn = new AtomicInteger();
public static AtomicInteger childProcesses() {
return iscpn;
@@ -53,13 +54,12 @@ public final class RSCClientFactory implements
LivyClientFactory {
* Otherwise, a new Spark context will be started with the given
configuration.
*/
@Override
- public LivyClient createClient(URI uri, Properties config) {
+ public Optional<LivyClient> createClient(URI uri, Properties config) {
if (!"rsc".equals(uri.getScheme())) {
- return null;
+ return Optional.empty();
}
RSCConf lconf = new RSCConf(config);
-
boolean needsServer = false;
try {
Promise<ContextInfo> info;
@@ -73,7 +73,7 @@ public final class RSCClientFactory implements
LivyClientFactory {
info = processInfo.getContextInfo();
driverProcess = processInfo.getDriverProcess();
}
- return new RSCClient(lconf, info, driverProcess);
+ return Optional.of(new RSCClient(lconf, info, driverProcess));
} catch (Exception e) {
if (needsServer) {
unref();
diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
index 4c45956d..933948fa 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
@@ -37,7 +37,7 @@ public class RSCConf extends ClientConf<RSCConf> {
public static final String RSC_CONF_PREFIX = "livy.rsc.";
- public static enum Entry implements ConfEntry {
+ public enum Entry implements ConfEntry {
CLIENT_ID("client.auth.id", null),
CLIENT_SECRET("client.auth.secret", null),
CLIENT_IN_PROCESS("client.do-not-use.run-driver-in-process", false),
@@ -87,7 +87,7 @@ public class RSCConf extends ClientConf<RSCConf> {
private final String key;
private final Object dflt;
- private Entry(String key, Object dflt) {
+ Entry(String key, Object dflt) {
this.key = RSC_CONF_PREFIX + key;
this.dflt = dflt;
}
@@ -178,7 +178,7 @@ public class RSCConf extends ClientConf<RSCConf> {
return deprecatedConfigs;
}
- static enum DepConf implements DeprecatedConf {
+ enum DepConf implements DeprecatedConf {
CLIENT_IN_PROCESS("client.do_not_use.run_driver_in_process", "0.4"),
CLIENT_SHUTDOWN_TIMEOUT("client.shutdown_timeout", "0.4"),
DRIVER_CLASS("driver_class", "0.4"),
@@ -194,11 +194,11 @@ public class RSCConf extends ClientConf<RSCConf> {
private final String version;
private final String deprecationMessage;
- private DepConf(String key, String version) {
+ DepConf(String key, String version) {
this(key, version, "");
}
- private DepConf(String key, String version, String deprecationMessage) {
+ DepConf(String key, String version, String deprecationMessage) {
this.key = RSC_CONF_PREFIX + key;
this.version = version;
this.deprecationMessage = deprecationMessage;