This is an automated email from the ASF dual-hosted git repository.

lmccay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new 674ac00a LIVY-1030 cleanup of context and clients (#506) 
(edwardcapriolo via lmccay)
674ac00a is described below

commit 674ac00ab28c4a259cb5e9b3ccf9afe706b9aad1
Author: Edward Capriolo <[email protected]>
AuthorDate: Wed Jan 21 14:29:41 2026 -0500

    LIVY-1030 cleanup of context and clients (#506) (edwardcapriolo via lmccay)
---
 .../java/org/apache/livy/LivyClientBuilder.java    | 29 +++----
 .../java/org/apache/livy/LivyClientFactory.java    |  6 +-
 .../java/org/apache/livy/TestClientFactory.java    | 11 ++-
 .../apache/livy/client/http/HttpClientFactory.java |  8 +-
 .../java/org/apache/livy/rsc/ContextLauncher.java  | 88 +++++++++-------------
 .../java/org/apache/livy/rsc/RSCClientFactory.java | 10 +--
 rsc/src/main/java/org/apache/livy/rsc/RSCConf.java | 10 +--
 7 files changed, 74 insertions(+), 88 deletions(-)

diff --git a/api/src/main/java/org/apache/livy/LivyClientBuilder.java 
b/api/src/main/java/org/apache/livy/LivyClientBuilder.java
index 1d7ec018..d736648c 100644
--- a/api/src/main/java/org/apache/livy/LivyClientBuilder.java
+++ b/api/src/main/java/org/apache/livy/LivyClientBuilder.java
@@ -23,11 +23,8 @@ import java.io.Reader;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.ServiceLoader;
+import java.util.*;
+
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 /**
@@ -134,20 +131,24 @@ public final class LivyClientBuilder {
 
     LivyClient client = null;
     if (CLIENT_FACTORIES.isEmpty()) {
-      throw new IllegalStateException("No LivyClientFactory implementation was 
found.");
+      throw new IllegalStateException("No LivyClientFactory implementations 
were found.");
     }
 
     for (LivyClientFactory factory : CLIENT_FACTORIES) {
       try {
-        client = factory.createClient(uri, config);
-      } catch (Exception e) {
-        if (!(e instanceof RuntimeException)) {
-          e = new RuntimeException(e);
+        Optional<LivyClient> found = factory.createClient(uri, config);
+        if (found.isPresent()){
+          client = found.get();
+          break;
+        }
+      } catch (Exception e){
+        //Note: the compiler identifies this as impossible. The initial author 
might
+        // believe the factories sneaky throw other exceptions
+        if (e instanceof RuntimeException){
+          throw e;
+        } else {
+          throw new RuntimeException(e);
         }
-        throw (RuntimeException) e;
-      }
-      if (client != null) {
-        break;
       }
     }
 
diff --git a/api/src/main/java/org/apache/livy/LivyClientFactory.java 
b/api/src/main/java/org/apache/livy/LivyClientFactory.java
index 1b9c72b6..975ec9d0 100644
--- a/api/src/main/java/org/apache/livy/LivyClientFactory.java
+++ b/api/src/main/java/org/apache/livy/LivyClientFactory.java
@@ -18,6 +18,7 @@
 package org.apache.livy;
 
 import java.net.URI;
+import java.util.Optional;
 import java.util.Properties;
 
 import org.apache.livy.annotations.Private;
@@ -39,8 +40,9 @@ public interface LivyClientFactory {
    *
    * @param uri URI pointing at the livy backend to use.
    * @param config Livy client configs.
-   * @return The newly created LivyClient or null if an unsupported URI
+   * @return Some if the factory understands the URI
+   * @throws RuntimeException if they supports the scheme but fail to create a 
client
    */
-  LivyClient createClient(URI uri, Properties config);
+  Optional<LivyClient> createClient(URI uri, Properties config);
 
 }
diff --git a/api/src/test/java/org/apache/livy/TestClientFactory.java 
b/api/src/test/java/org/apache/livy/TestClientFactory.java
index 622908c0..a4a4413a 100644
--- a/api/src/test/java/org/apache/livy/TestClientFactory.java
+++ b/api/src/test/java/org/apache/livy/TestClientFactory.java
@@ -19,13 +19,14 @@ package org.apache.livy;
 
 import java.io.File;
 import java.net.URI;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class TestClientFactory implements LivyClientFactory {
 
-  private static AtomicLong instanceCount = new AtomicLong();
+  private static final AtomicLong instanceCount = new AtomicLong();
   public static long getInstanceCount() {
     return instanceCount.get();
   }
@@ -35,16 +36,14 @@ public class TestClientFactory implements LivyClientFactory 
{
   }
 
   @Override
-  public LivyClient createClient(URI uri, Properties config) {
+  public Optional<LivyClient> createClient(URI uri, Properties config) {
     switch (uri.getPath()) {
       case "match":
-        return new Client(config);
-
+        return Optional.of(new Client(config));
       case "error":
         throw new IllegalStateException("error");
-
       default:
-        return null;
+        return Optional.empty();
     }
   }
 
diff --git 
a/client-http/src/main/java/org/apache/livy/client/http/HttpClientFactory.java 
b/client-http/src/main/java/org/apache/livy/client/http/HttpClientFactory.java
index 622a74e5..873ea321 100644
--- 
a/client-http/src/main/java/org/apache/livy/client/http/HttpClientFactory.java
+++ 
b/client-http/src/main/java/org/apache/livy/client/http/HttpClientFactory.java
@@ -18,6 +18,7 @@
 package org.apache.livy.client.http;
 
 import java.net.URI;
+import java.util.Optional;
 import java.util.Properties;
 
 import org.apache.livy.LivyClient;
@@ -29,12 +30,11 @@ import org.apache.livy.LivyClientFactory;
 public final class HttpClientFactory implements LivyClientFactory {
 
   @Override
-  public LivyClient createClient(URI uri, Properties config) {
+  public Optional<LivyClient> createClient(URI uri, Properties config) {
     if (!"http".equals(uri.getScheme()) && !"https".equals(uri.getScheme())) {
-      return null;
+      return Optional.empty();
     }
-
-    return new HttpClient(uri, new HttpConf(config));
+    return Optional.of(new HttpClient(uri, new HttpConf(config)));
   }
 
 }
diff --git a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java 
b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
index c59136d5..8c449490 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
@@ -17,12 +17,9 @@
 
 package org.apache.livy.rsc;
 
-import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.io.Reader;
@@ -66,7 +63,6 @@ class ContextLauncher {
   private static final Logger LOG = 
LoggerFactory.getLogger(ContextLauncher.class);
   private static final AtomicInteger CHILD_IDS = new AtomicInteger();
 
-  private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode";
   private static final String SPARK_JARS_KEY = "spark.jars";
   private static final String SPARK_ARCHIVES_KEY = "spark.yarn.dist.archives";
   private static final String SPARK_HOME_ENV = "SPARK_HOME";
@@ -95,9 +91,6 @@ class ContextLauncher {
     final RegistrationHandler handler = new RegistrationHandler();
     try {
       factory.getServer().registerClient(clientId, secret, handler);
-      String replMode = conf.get("repl");
-      boolean repl = replMode != null && replMode.equals("true");
-
       // In some scenarios the user may need to configure this endpoint 
setting explicitly.
       String address = conf.get(LAUNCHER_ADDRESS);
       // If not specified, use the RPC server address; otherwise use the 
specified address.
@@ -112,7 +105,7 @@ class ContextLauncher {
       Utils.addListener(promise, new FutureListener<ContextInfo>() {
         @Override
         public void onFailure(Throwable error) throws Exception {
-          // If promise is cancelled or failed, make sure spark-submit is not 
leaked.
+          // If promise is canceled or failed, make sure spark-submit is not 
leaked.
           if (child != null) {
             child.kill();
           }
@@ -181,13 +174,16 @@ class ContextLauncher {
       }
 
       Utils.checkState(rscJars.isDirectory(),
-        "Cannot find rsc jars directory under LIVY_HOME.");
+        "Cannot find rsc jars directory: " + rscJars.getAbsolutePath());
       allJars.add(rscJars);
 
       List<String> jars = new ArrayList<>();
       for (File dir : allJars) {
-        for (File f : dir.listFiles()) {
-           jars.add(f.getAbsolutePath());
+        File [] list = dir.listFiles();
+        if (list != null) {
+          for (File f : list) {
+            jars.add(f.getAbsolutePath());
+          }
         }
       }
       livyJars = Utils.join(jars, ",");
@@ -226,14 +222,11 @@ class ContextLauncher {
     } else if (conf.getBoolean(CLIENT_IN_PROCESS)) {
       // Mostly for testing things quickly. Do not do this in production.
       LOG.warn("!!!! Running remote driver in-process. !!!!");
-      Runnable child = new Runnable() {
-        @Override
-        public void run() {
-          try {
-            RSCDriverBootstrapper.main(new String[] { 
confFile.getAbsolutePath() });
-          } catch (Exception e) {
-            throw Utils.propagate(e);
-          }
+      Runnable child = () -> {
+        try {
+          RSCDriverBootstrapper.main(new String[] { confFile.getAbsolutePath() 
});
+        } catch (Exception e) {
+          throw Utils.propagate(e);
         }
       };
       return new ChildProcess(conf, promise, child, confFile);
@@ -261,7 +254,7 @@ class ContextLauncher {
    * Write the configuration to a file readable only by the process's owner. 
Livy properties
    * are written with an added prefix so that they can be loaded using 
SparkConf on the driver
    * side.
-   *
+   * <br>
    * The default Spark configuration (from either SPARK_HOME or 
SPARK_CONF_DIR) is merged into
    * the user configuration, so that defaults set by Livy's admin take effect 
when not overridden
    * by the user.
@@ -286,13 +279,10 @@ class ContextLauncher {
       File sparkDefaults = new File(confDir + File.separator + 
"spark-defaults.conf");
       if (sparkDefaults.isFile()) {
         Properties sparkConf = new Properties();
-        Reader r = new InputStreamReader(new FileInputStream(sparkDefaults), 
UTF_8);
-        try {
-          sparkConf.load(r);
-        } finally {
-          r.close();
+        try (Reader r = new InputStreamReader(
+                Files.newInputStream(sparkDefaults.toPath()), UTF_8)) {
+            sparkConf.load(r);
         }
-
         for (String key : sparkConf.stringPropertyNames()) {
           if (!confView.containsKey(key)) {
             confView.put(key, sparkConf.getProperty(key));
@@ -303,15 +293,9 @@ class ContextLauncher {
 
     File file = File.createTempFile("livyConf", ".properties");
     Files.setPosixFilePermissions(file.toPath(), EnumSet.of(OWNER_READ, 
OWNER_WRITE));
-    //file.deleteOnExit();
-
-    Writer writer = new OutputStreamWriter(new FileOutputStream(file), UTF_8);
-    try {
-      confView.store(writer, "Livy App Context Configuration");
-    } finally {
-      writer.close();
+    try (Writer writer = new OutputStreamWriter(new FileOutputStream(file), 
UTF_8)) {
+        confView.store(writer, "Livy App Context Configuration");
     }
-
     return file;
   }
 
@@ -340,14 +324,16 @@ class ContextLauncher {
       }
     }
 
-    private void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) {
+    //Note. Your compiler or IDE may identify this method as unused
+    //tests fail without it
+    public void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) {
       InetSocketAddress insocket = (InetSocketAddress) 
ctx.channel().remoteAddress();
       String ip = insocket.getAddress().getHostAddress();
       ContextInfo info = new ContextInfo(ip, msg.port, clientId, secret);
       if (promise.trySuccess(info)) {
         timeout.cancel(true);
         LOG.debug("Received driver info for client {}: {}/{}.", 
client.getChannel(),
-          msg.host, msg.port);
+                msg.host, msg.port);
       } else {
         LOG.warn("Connection established but promise is already finalized.");
       }
@@ -398,7 +384,7 @@ class ContextLauncher {
             }
           } catch (InterruptedException ie) {
             LOG.warn("Waiting thread interrupted, killing child process.");
-            Thread.interrupted();
+            boolean ignored = Thread.interrupted();
             child.destroy();
           } catch (Exception e) {
             LOG.warn("Exception while waiting for child process.", e);
@@ -436,36 +422,34 @@ class ContextLauncher {
       try {
         monitor.join(conf.getTimeAsMs(CLIENT_SHUTDOWN_TIMEOUT));
       } catch (InterruptedException ie) {
-        LOG.debug("Interrupted before driver thread was finished.");
+        LOG.debug("Interrupted before driver thread was finished.", ie);
       }
     }
 
     private Thread monitor(final Runnable task, int childId) {
-      Runnable wrappedTask = new Runnable() {
-        @Override
-        public void run() {
-          try {
-            task.run();
-          } finally {
-            confFile.delete();
-          }
+      Runnable wrappedTask = () -> {
+        try {
+          task.run();
+        } finally {
+          boolean ignored = confFile.delete();
         }
       };
       Thread thread = new Thread(wrappedTask);
       thread.setDaemon(true);
       thread.setName("ContextLauncher-" + childId);
-      thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() 
{
-        @Override
-        public void uncaughtException(Thread t, Throwable e) {
-          LOG.warn("Child task threw exception.", e);
-          fail(e);
-        }
+      thread.setUncaughtExceptionHandler((t, e) -> {
+        LOG.warn("Child task threw exception.", e);
+        fail(e);
       });
       thread.start();
       return thread;
     }
   }
 
+  public RSCConf getConf() {
+    return conf;
+  }
+
   // Just for testing.
   static Process mockSparkSubmit;
 
diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java 
b/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java
index d9d56fb6..85ebfe0a 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java
@@ -19,6 +19,7 @@ package org.apache.livy.rsc;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -37,7 +38,7 @@ public final class RSCClientFactory implements 
LivyClientFactory {
   private final AtomicInteger refCount = new AtomicInteger();
   private RpcServer server = null;
   // interactive session child processes number
-  private static AtomicInteger iscpn = new AtomicInteger();
+  private static final AtomicInteger iscpn = new AtomicInteger();
 
   public static AtomicInteger childProcesses() {
     return iscpn;
@@ -53,13 +54,12 @@ public final class RSCClientFactory implements 
LivyClientFactory {
    * Otherwise, a new Spark context will be started with the given 
configuration.
    */
   @Override
-  public LivyClient createClient(URI uri, Properties config) {
+  public Optional<LivyClient> createClient(URI uri, Properties config) {
     if (!"rsc".equals(uri.getScheme())) {
-      return null;
+      return Optional.empty();
     }
 
     RSCConf lconf = new RSCConf(config);
-
     boolean needsServer = false;
     try {
       Promise<ContextInfo> info;
@@ -73,7 +73,7 @@ public final class RSCClientFactory implements 
LivyClientFactory {
         info = processInfo.getContextInfo();
         driverProcess = processInfo.getDriverProcess();
       }
-      return new RSCClient(lconf, info, driverProcess);
+      return Optional.of(new RSCClient(lconf, info, driverProcess));
     } catch (Exception e) {
       if (needsServer) {
         unref();
diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java 
b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
index 4c45956d..933948fa 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
@@ -37,7 +37,7 @@ public class RSCConf extends ClientConf<RSCConf> {
 
   public static final String RSC_CONF_PREFIX = "livy.rsc.";
 
-  public static enum Entry implements ConfEntry {
+  public enum Entry implements ConfEntry {
     CLIENT_ID("client.auth.id", null),
     CLIENT_SECRET("client.auth.secret", null),
     CLIENT_IN_PROCESS("client.do-not-use.run-driver-in-process", false),
@@ -87,7 +87,7 @@ public class RSCConf extends ClientConf<RSCConf> {
     private final String key;
     private final Object dflt;
 
-    private Entry(String key, Object dflt) {
+    Entry(String key, Object dflt) {
       this.key = RSC_CONF_PREFIX + key;
       this.dflt = dflt;
     }
@@ -178,7 +178,7 @@ public class RSCConf extends ClientConf<RSCConf> {
     return deprecatedConfigs;
   }
 
-  static enum DepConf implements DeprecatedConf {
+  enum DepConf implements DeprecatedConf {
     CLIENT_IN_PROCESS("client.do_not_use.run_driver_in_process", "0.4"),
     CLIENT_SHUTDOWN_TIMEOUT("client.shutdown_timeout", "0.4"),
     DRIVER_CLASS("driver_class", "0.4"),
@@ -194,11 +194,11 @@ public class RSCConf extends ClientConf<RSCConf> {
     private final String version;
     private final String deprecationMessage;
 
-    private DepConf(String key, String version) {
+    DepConf(String key, String version) {
       this(key, version, "");
     }
 
-    private DepConf(String key, String version, String deprecationMessage) {
+    DepConf(String key, String version, String deprecationMessage) {
       this.key = RSC_CONF_PREFIX + key;
       this.version = version;
       this.deprecationMessage = deprecationMessage;

Reply via email to