This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch branch-0.12
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.12 by this push:
     new d955bc7ed8 [ZEPPELIN-6256] Fix resource leaks in 
SparkInterpreterLauncher.detectSparkScalaVersion
d955bc7ed8 is described below

commit d955bc7ed825db15f1c083752dbd188d0df4de69
Author: renechoi <115696395+renec...@users.noreply.github.com>
AuthorDate: Mon Aug 25 15:47:14 2025 +0900

    [ZEPPELIN-6256] Fix resource leaks in 
SparkInterpreterLauncher.detectSparkScalaVersion
    
    ## What is this PR for?
      This PR fixes resource leaks in the `detectSparkScalaVersion` method of 
`SparkInterpreterLauncher.java` by modernizing the implementation to capture 
process output directly without temporary files.
    
      **Previous issues:**
      - Resource leak from unclosed FileInputStream when reading process output
      - Disk space accumulation from undeleted temporary files
    
      **Solution:**
      - Directly capture process error stream using 
`IOUtils.toString(process.getErrorStream(), StandardCharsets.UTF_8)`
      - Eliminated temporary file creation entirely for cleaner, more efficient 
implementation
    
      ## What type of PR is it?
      Bug Fix / Code Improvement
    
      ## Todos
      - [x] Code review
      - [x] CI build verification
    
      ## What is the Jira issue?
      https://issues.apache.org/jira/browse/ZEPPELIN-6256
    
      ## How should this be tested?
      **Unit test added:**
      - `testDetectSparkScalaVersionDirectStreamCapture`: Verifies the 
modernized stream capture approach works correctly and returns valid Scala 
version (2.12 or 2.13)
    
      **Manual testing:**
      1. Start Zeppelin with Spark interpreter
      2. Verify Spark interpreter launches successfully
      3. Confirm no temporary files created in temp directory
      4. Monitor system resources - no file descriptor leaks
    
      ## Screenshots (if appropriate)
      N/A
    
      ## Questions:
      - **Does the license files need to update?** No
      - **Is there breaking changes for older versions?** No
      - **Does this needs documentation?** No
    
      ## Implementation Details
      - Modernized to capture process output directly via `IOUtils.toString()` 
without intermediate temporary files
      - Maintains full backward compatibility with no API changes
      - Cleaner, more maintainable code following modern Java practices
      - Eliminates all file system operations for output capture
    
    Closes #5000 from renechoi/ZEPPELIN-6256-upstream-clean.
    
    Signed-off-by: Philipp Dallig <philipp.dal...@gmail.com>
    (cherry picked from commit 55789d367fae348a750b4ff6332779f749615d4b)
    Signed-off-by: Philipp Dallig <philipp.dal...@gmail.com>
---
 .../launcher/SparkInterpreterLauncher.java         |  8 +++++---
 .../launcher/SparkInterpreterLauncherTest.java     | 22 ++++++++++++++++++++++
 2 files changed, 27 insertions(+), 3 deletions(-)

diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
index 8898adfbec..d131c816e0 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
@@ -270,11 +270,13 @@ public class SparkInterpreterLauncher extends 
StandardInterpreterLauncher {
     LOGGER.info("Detect scala version from SPARK_HOME: {}", sparkHome);
     ProcessBuilder builder = new ProcessBuilder(sparkHome + 
"/bin/spark-submit", "--version");
     builder.environment().putAll(env);
-    File processOutputFile = File.createTempFile("zeppelin-spark", ".out");
-    builder.redirectError(processOutputFile);
+    
     Process process = builder.start();
     process.waitFor();
-    String processOutput = IOUtils.toString(new 
FileInputStream(processOutputFile), StandardCharsets.UTF_8);
+    
+    // Capture the error stream directly without using a temp file
+    String processOutput = IOUtils.toString(process.getErrorStream(), 
StandardCharsets.UTF_8);
+    
     Pattern pattern = Pattern.compile(".*Using Scala version (.*),.*");
     Matcher matcher = pattern.matcher(processOutput);
     if (matcher.find()) {
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
index 4721585724..52ac5a09b5 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
@@ -39,6 +39,9 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Map;
+import java.util.HashMap;
+import java.lang.reflect.Method;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -330,6 +333,25 @@ class SparkInterpreterLauncherTest {
     FileUtils.deleteDirectory(localRepoPath.toFile());
   }
 
+  @Test
+  void testDetectSparkScalaVersionDirectStreamCapture() throws Exception {
+    SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, 
null);
+    
+    // Use reflection to access private method
+    Method detectSparkScalaVersionMethod = 
SparkInterpreterLauncher.class.getDeclaredMethod(
+        "detectSparkScalaVersion", String.class, Map.class);
+    detectSparkScalaVersionMethod.setAccessible(true);
+    
+    Map<String, String> env = new HashMap<>();
+    
+    // Call the method
+    String scalaVersion = (String) 
detectSparkScalaVersionMethod.invoke(launcher, sparkHome, env);
+    
+    // Verify we got a valid result
+    assertTrue(scalaVersion.equals("2.12") || scalaVersion.equals("2.13"), 
+        "Expected scala version 2.12 or 2.13 but got: " + scalaVersion);
+  }
+  
   @Test
   void testDetectSparkScalaVersionByReplClassWithNonExistentDirectory() throws 
Exception {
     SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, 
null);

Reply via email to