This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch branch-0.12 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.12 by this push: new d955bc7ed8 [ZEPPELIN-6256] Fix resource leaks in SparkInterpreterLauncher.detectSparkScalaVersion d955bc7ed8 is described below commit d955bc7ed825db15f1c083752dbd188d0df4de69 Author: renechoi <115696395+renec...@users.noreply.github.com> AuthorDate: Mon Aug 25 15:47:14 2025 +0900 [ZEPPELIN-6256] Fix resource leaks in SparkInterpreterLauncher.detectSparkScalaVersion ## What is this PR for? This PR fixes resource leaks in the `detectSparkScalaVersion` method of `SparkInterpreterLauncher.java` by modernizing the implementation to capture process output directly without temporary files. **Previous issues:** - Resource leak from unclosed FileInputStream when reading process output - Disk space accumulation from undeleted temporary files **Solution:** - Directly capture process error stream using `IOUtils.toString(process.getErrorStream(), StandardCharsets.UTF_8)` - Eliminated temporary file creation entirely for cleaner, more efficient implementation ## What type of PR is it? Bug Fix / Code Improvement ## Todos - [x] Code review - [x] CI build verification ## What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-6256 ## How should this be tested? **Unit test added:** - `testDetectSparkScalaVersionDirectStreamCapture`: Verifies the modernized stream capture approach works correctly and returns valid Scala version (2.12 or 2.13) **Manual testing:** 1. Start Zeppelin with Spark interpreter 2. Verify Spark interpreter launches successfully 3. Confirm no temporary files created in temp directory 4. Monitor system resources - no file descriptor leaks ## Screenshots (if appropriate) N/A ## Questions: - **Does the license files need to update?** No - **Is there breaking changes for older versions?** No - **Does this needs documentation?** No ## Implementation Details - Modernized to capture process output directly via `IOUtils.toString()` without intermediate temporary files - Maintains full backward compatibility with no API changes - Cleaner, more maintainable code following modern Java practices - Eliminates all file system operations for output capture Closes #5000 from renechoi/ZEPPELIN-6256-upstream-clean. Signed-off-by: Philipp Dallig <philipp.dal...@gmail.com> (cherry picked from commit 55789d367fae348a750b4ff6332779f749615d4b) Signed-off-by: Philipp Dallig <philipp.dal...@gmail.com> --- .../launcher/SparkInterpreterLauncher.java | 8 +++++--- .../launcher/SparkInterpreterLauncherTest.java | 22 ++++++++++++++++++++++ 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java index 8898adfbec..d131c816e0 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java @@ -270,11 +270,13 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { LOGGER.info("Detect scala version from SPARK_HOME: {}", sparkHome); ProcessBuilder builder = new ProcessBuilder(sparkHome + "/bin/spark-submit", "--version"); builder.environment().putAll(env); - File processOutputFile = File.createTempFile("zeppelin-spark", ".out"); - builder.redirectError(processOutputFile); + Process process = builder.start(); process.waitFor(); - String processOutput = IOUtils.toString(new FileInputStream(processOutputFile), StandardCharsets.UTF_8); + + // Capture the error stream directly without using a temp file + String processOutput = IOUtils.toString(process.getErrorStream(), StandardCharsets.UTF_8); + Pattern pattern = Pattern.compile(".*Using Scala version (.*),.*"); Matcher matcher = pattern.matcher(processOutput); if (matcher.find()) { diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java index 4721585724..52ac5a09b5 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java @@ -39,6 +39,9 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.Map; +import java.util.HashMap; +import java.lang.reflect.Method; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -330,6 +333,25 @@ class SparkInterpreterLauncherTest { FileUtils.deleteDirectory(localRepoPath.toFile()); } + @Test + void testDetectSparkScalaVersionDirectStreamCapture() throws Exception { + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + + // Use reflection to access private method + Method detectSparkScalaVersionMethod = SparkInterpreterLauncher.class.getDeclaredMethod( + "detectSparkScalaVersion", String.class, Map.class); + detectSparkScalaVersionMethod.setAccessible(true); + + Map<String, String> env = new HashMap<>(); + + // Call the method + String scalaVersion = (String) detectSparkScalaVersionMethod.invoke(launcher, sparkHome, env); + + // Verify we got a valid result + assertTrue(scalaVersion.equals("2.12") || scalaVersion.equals("2.13"), + "Expected scala version 2.12 or 2.13 but got: " + scalaVersion); + } + @Test void testDetectSparkScalaVersionByReplClassWithNonExistentDirectory() throws Exception { SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);