This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 01efbd11ae [ZEPPELIN-6259] Add null safety checks in SparkInterpreterLauncher.detectSparkScalaVersionByReplClass 01efbd11ae is described below commit 01efbd11aee1857123b1d8bbcf15dcb26772df0c Author: renechoi <115696395+renec...@users.noreply.github.com> AuthorDate: Thu Aug 21 15:36:51 2025 +0900 [ZEPPELIN-6259] Add null safety checks in SparkInterpreterLauncher.detectSparkScalaVersionByReplClass ### What is this PR for? This PR adds null safety checks to the `detectSparkScalaVersionByReplClass` method in `SparkInterpreterLauncher.java` to prevent NullPointerException and provide clear error messages when the Spark jars directory is inaccessible. ### Current Issues Fixed: 1. **NullPointerException Risk**: `listFiles()` returns null when directory doesn't exist or is inaccessible 2. **Poor Error Messages**: Users get cryptic NPE instead of meaningful error messages 3. **Missing Validation**: No checks for directory existence or type ### What type of PR is it? Bug Fix / Improvement ### Todos * [ ] - Code review * [ ] - CI build verification ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-6259 ### How should this be tested? * **Unit tests added**: - `testDetectSparkScalaVersionByReplClassWithNonExistentDirectory` - Verifies error when directory doesn't exist - `testDetectSparkScalaVersionByReplClassWithFileInsteadOfDirectory` - Verifies error when path is a file - `testDetectSparkScalaVersionByReplClassWithValidDirectory` - Verifies normal operation works - `testDetectSparkScalaVersionByReplClassWithEmptyDirectory` - Verifies error when no spark-repl jars found * **Manual testing**: - Set invalid SPARK_HOME and verify clear error messages - Remove read permissions on SPARK_HOME/jars and verify permission error * **CI**: All existing tests pass ### Screenshots (if appropriate) N/A ### Questions: * Does the license files need to update? **No** * Is there breaking changes for older versions? **No** * Does this needs documentation? **No** ### Implementation Details - Added directory existence check with clear error message - Added directory type validation (ensures it's not a file) - Added null check for `listFiles()` result with permission hint - All error messages include the problematic path for easier debugging - Used IOException for file system related errors (consistent with Java conventions) ### Error Message Examples Before (NPE): java.lang.NullPointerException at java.util.stream.Stream.of(Stream.java:1012) After (Clear messages): java.io.IOException: Spark jars directory does not exist: /invalid/path/jars. Please check your SPARK_HOME setting. java.io.IOException: Spark jars path is not a directory: /some/file/jars java.io.IOException: Cannot access Spark jars directory: /restricted/jars. Please check permissions. ### Benefits 1. **Better User Experience**: Clear error messages help users quickly identify and fix configuration issues 2. **Defensive Programming**: Prevents crashes from null pointer exceptions 3. **Easier Debugging**: Specific error messages with paths make troubleshooting straightforward 4. **Production Ready**: Handles edge cases that can occur in various deployment environments Closes #4999 from renechoi/ZEPPELIN-6259-upstream-clean. Signed-off-by: Philipp Dallig <philipp.dal...@gmail.com> --- .../launcher/SparkInterpreterLauncher.java | 42 ++-- .../launcher/SparkInterpreterLauncherTest.java | 219 +++++++++++++++++++++ 2 files changed, 249 insertions(+), 12 deletions(-) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java index 33b3e4ba62..8898adfbec 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java @@ -292,23 +292,41 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { } private String detectSparkScalaVersionByReplClass(String sparkHome) throws Exception { - File sparkJarsFolder = new File(sparkHome + "/jars"); - File[] sparkJarFiles = sparkJarsFolder.listFiles(); - long sparkReplFileNum = - Stream.of(sparkJarFiles).filter(file -> file.getName().contains("spark-repl_")).count(); - if (sparkReplFileNum == 0) { + Path sparkJarsPath = Paths.get(sparkHome, "jars"); + + // Check if the directory exists + if (!Files.exists(sparkJarsPath)) { + throw new IOException("Spark jars directory does not exist: " + sparkJarsPath.toAbsolutePath() + + ". Please check your SPARK_HOME setting."); + } + + // Check if it's actually a directory + if (!Files.isDirectory(sparkJarsPath)) { + throw new IOException("Spark jars path is not a directory: " + sparkJarsPath.toAbsolutePath()); + } + + // List files using DirectoryStream + List<Path> sparkReplJars = new ArrayList<>(); + try (DirectoryStream<Path> stream = Files.newDirectoryStream(sparkJarsPath, "spark-repl_*.jar")) { + for (Path entry : stream) { + sparkReplJars.add(entry); + } + } catch (IOException e) { + throw new IOException("Cannot access Spark jars directory: " + sparkJarsPath.toAbsolutePath() + + ". Please check permissions.", e); + } + + if (sparkReplJars.isEmpty()) { throw new Exception("No spark-repl jar found in SPARK_HOME: " + sparkHome); } - if (sparkReplFileNum > 1) { + if (sparkReplJars.size() > 1) { throw new Exception("Multiple spark-repl jar found in SPARK_HOME: " + sparkHome); } - boolean sparkRepl212Exists = - Stream.of(sparkJarFiles).anyMatch(file -> file.getName().contains("spark-repl_2.12")); - boolean sparkRepl213Exists = - Stream.of(sparkJarFiles).anyMatch(file -> file.getName().contains("spark-repl_2.13")); - if (sparkRepl212Exists) { + + String fileName = sparkReplJars.get(0).getFileName().toString(); + if (fileName.contains("spark-repl_2.12")) { return "2.12"; - } else if (sparkRepl213Exists) { + } else if (fileName.contains("spark-repl_2.13")) { return "2.13"; } else { throw new Exception("Can not detect the scala version by spark-repl"); diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java index c1dc975b3c..4721585724 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java @@ -31,15 +31,19 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.lang.reflect.Method; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; class SparkInterpreterLauncherTest { @@ -325,4 +329,219 @@ class SparkInterpreterLauncherTest { } FileUtils.deleteDirectory(localRepoPath.toFile()); } + + @Test + void testDetectSparkScalaVersionByReplClassWithNonExistentDirectory() throws Exception { + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + + // Use reflection to access private method + Method detectMethod = SparkInterpreterLauncher.class.getDeclaredMethod( + "detectSparkScalaVersionByReplClass", String.class); + detectMethod.setAccessible(true); + + // Test with non-existent directory + String nonExistentSparkHome = "/tmp/non-existent-spark-home-" + System.currentTimeMillis(); + + try { + detectMethod.invoke(launcher, nonExistentSparkHome); + fail("Expected IOException for non-existent directory"); + } catch (Exception e) { + Throwable cause = e.getCause(); + assertTrue(cause instanceof IOException, "Expected IOException but got: " + cause.getClass()); + assertTrue(cause.getMessage().contains("does not exist"), + "Error message should mention directory does not exist: " + cause.getMessage()); + assertTrue(cause.getMessage().contains("SPARK_HOME"), + "Error message should mention SPARK_HOME: " + cause.getMessage()); + } + } + + @Test + void testDetectSparkScalaVersionByReplClassWithFileInsteadOfDirectory() throws Exception { + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + + // Use reflection to access private method + Method detectMethod = SparkInterpreterLauncher.class.getDeclaredMethod( + "detectSparkScalaVersionByReplClass", String.class); + detectMethod.setAccessible(true); + + // Create a temporary file (not directory) + File tempFile = File.createTempFile("spark-test", ".tmp"); + tempFile.deleteOnExit(); + + // Create a fake SPARK_HOME that points to a parent directory + String fakeSparkHome = tempFile.getParent() + "/" + tempFile.getName().replace(".tmp", ""); + + // Rename temp file to simulate jars path as a file + File jarsFile = new File(fakeSparkHome + "/jars"); + jarsFile.getParentFile().mkdirs(); + tempFile.renameTo(jarsFile); + jarsFile.deleteOnExit(); + + try { + detectMethod.invoke(launcher, fakeSparkHome); + fail("Expected IOException for file instead of directory"); + } catch (Exception e) { + Throwable cause = e.getCause(); + assertTrue(cause instanceof IOException, "Expected IOException but got: " + cause.getClass()); + assertTrue(cause.getMessage().contains("not a directory"), + "Error message should mention not a directory: " + cause.getMessage()); + } finally { + jarsFile.delete(); + } + } + + @Test + void testDetectSparkScalaVersionByReplClassWithValidDirectory() throws Exception { + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + + // Use reflection to access private method + Method detectMethod = SparkInterpreterLauncher.class.getDeclaredMethod( + "detectSparkScalaVersionByReplClass", String.class); + detectMethod.setAccessible(true); + + // Create a temporary directory structure + Path tempSparkHome = Files.createTempDirectory("spark-test"); + Path jarsDir = tempSparkHome.resolve("jars"); + Files.createDirectories(jarsDir); + + // Create a fake spark-repl jar + Path sparkReplJar = jarsDir.resolve("spark-repl_2.12-3.0.0.jar"); + Files.createFile(sparkReplJar); + + try { + String scalaVersion = (String) detectMethod.invoke(launcher, tempSparkHome.toString()); + assertEquals("2.12", scalaVersion, "Should detect Scala 2.12"); + } finally { + // Clean up + Files.deleteIfExists(sparkReplJar); + Files.deleteIfExists(jarsDir); + Files.deleteIfExists(tempSparkHome); + } + } + + @Test + void testDetectSparkScalaVersionByReplClassWithEmptyDirectory() throws Exception { + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + + // Use reflection to access private method + Method detectMethod = SparkInterpreterLauncher.class.getDeclaredMethod( + "detectSparkScalaVersionByReplClass", String.class); + detectMethod.setAccessible(true); + + // Create a temporary directory structure with empty jars directory + Path tempSparkHome = Files.createTempDirectory("spark-test"); + Path jarsDir = tempSparkHome.resolve("jars"); + Files.createDirectories(jarsDir); + + try { + detectMethod.invoke(launcher, tempSparkHome.toString()); + fail("Expected Exception for no spark-repl jar"); + } catch (Exception e) { + Throwable cause = e.getCause(); + assertTrue(cause.getMessage().contains("No spark-repl jar found"), + "Error message should mention no spark-repl jar found: " + cause.getMessage()); + } finally { + // Clean up + Files.deleteIfExists(jarsDir); + Files.deleteIfExists(tempSparkHome); + } + } + + @Test + void testDetectSparkScalaVersionByReplClassWithMultipleJars() throws Exception { + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + + // Use reflection to access private method + Method detectMethod = SparkInterpreterLauncher.class.getDeclaredMethod( + "detectSparkScalaVersionByReplClass", String.class); + detectMethod.setAccessible(true); + + // Create a temporary directory structure with multiple spark-repl jars + Path tempSparkHome = Files.createTempDirectory("spark-test"); + Path jarsDir = tempSparkHome.resolve("jars"); + Files.createDirectories(jarsDir); + + // Create multiple spark-repl jars + Path sparkReplJar1 = jarsDir.resolve("spark-repl_2.12-3.0.0.jar"); + Path sparkReplJar2 = jarsDir.resolve("spark-repl_2.13-3.1.0.jar"); + Files.createFile(sparkReplJar1); + Files.createFile(sparkReplJar2); + + try { + detectMethod.invoke(launcher, tempSparkHome.toString()); + fail("Expected Exception for multiple spark-repl jars"); + } catch (Exception e) { + Throwable cause = e.getCause(); + assertTrue(cause.getMessage().contains("Multiple spark-repl jar found"), + "Error message should mention multiple spark-repl jars found: " + cause.getMessage()); + } finally { + // Clean up + Files.deleteIfExists(sparkReplJar1); + Files.deleteIfExists(sparkReplJar2); + Files.deleteIfExists(jarsDir); + Files.deleteIfExists(tempSparkHome); + } + } + + @Test + void testDetectSparkScalaVersionByReplClassWithScala213() throws Exception { + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + + // Use reflection to access private method + Method detectMethod = SparkInterpreterLauncher.class.getDeclaredMethod( + "detectSparkScalaVersionByReplClass", String.class); + detectMethod.setAccessible(true); + + // Create a temporary directory structure + Path tempSparkHome = Files.createTempDirectory("spark-test"); + Path jarsDir = tempSparkHome.resolve("jars"); + Files.createDirectories(jarsDir); + + // Create a fake spark-repl jar for Scala 2.13 + Path sparkReplJar = jarsDir.resolve("spark-repl_2.13-3.2.0.jar"); + Files.createFile(sparkReplJar); + + try { + String scalaVersion = (String) detectMethod.invoke(launcher, tempSparkHome.toString()); + assertEquals("2.13", scalaVersion, "Should detect Scala 2.13"); + } finally { + // Clean up + Files.deleteIfExists(sparkReplJar); + Files.deleteIfExists(jarsDir); + Files.deleteIfExists(tempSparkHome); + } + } + + @Test + void testDetectSparkScalaVersionByReplClassWithUnsupportedScalaVersion() throws Exception { + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + + // Use reflection to access private method + Method detectMethod = SparkInterpreterLauncher.class.getDeclaredMethod( + "detectSparkScalaVersionByReplClass", String.class); + detectMethod.setAccessible(true); + + // Create a temporary directory structure + Path tempSparkHome = Files.createTempDirectory("spark-test"); + Path jarsDir = tempSparkHome.resolve("jars"); + Files.createDirectories(jarsDir); + + // Create a fake spark-repl jar with unsupported Scala version + Path sparkReplJar = jarsDir.resolve("spark-repl_2.11-2.4.0.jar"); + Files.createFile(sparkReplJar); + + try { + detectMethod.invoke(launcher, tempSparkHome.toString()); + fail("Expected Exception for unsupported Scala version"); + } catch (Exception e) { + Throwable cause = e.getCause(); + assertTrue(cause.getMessage().contains("Can not detect the scala version by spark-repl"), + "Error message should mention cannot detect scala version: " + cause.getMessage()); + } finally { + // Clean up + Files.deleteIfExists(sparkReplJar); + Files.deleteIfExists(jarsDir); + Files.deleteIfExists(tempSparkHome); + } + } }