This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 6edb9f1015 [ZEPPELIN-6260] Fix memory leak in WebSocket watcher connections 6edb9f1015 is described below commit 6edb9f10152feb250d220bc84cb67f6637a87358 Author: renechoi <115696395+renec...@users.noreply.github.com> AuthorDate: Fri Jul 25 22:24:30 2025 +0900 [ZEPPELIN-6260] Fix memory leak in WebSocket watcher connections ### What is this PR for? This PR fixes a memory leak issue where WebSocket connections that are switched to watcher mode are never removed from the watcherSockets queue when the connection is closed. This causes the queue to grow indefinitely, leading to increased memory usage over time and potential OutOfMemoryError in long-running Zeppelin servers. ### What type of PR is it? Bug Fix ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-6260 ### How should this be tested? Steps to reproduce the issue: 1. Open a WebSocket connection to Zeppelin 2. Send a WATCHER message to switch the connection to watcher mode 3. Close the connection 4. The connection remains in watcherSockets queue indefinitely Verification after fix: - Unit tests have been added to verify the fix: - removeWatcherConnectionCleansQueue: Basic functionality test - removeWatcherConnectionWithMultipleWatchers: Tests selective removal - removeWatcherConnectionConcurrentTest: Tests thread safety - switchConnectionToWatcherAndRemove: Tests complete lifecycle ### Screenshots (if appropriate) N/A ### Questions: - Does the license files need update? No - Is there breaking changes for older versions? No - Does this needs documentation? No ### Description of changes: 1. Added removeWatcherConnection() method to ConnectionManager to safely remove connections from the watcherSockets queue 2. Modified NotebookServer.removeConnection() to call removeWatcherConnection() when any connection is closed 3. Added debug logging to track watcher connection removal 4. Added comprehensive unit tests including concurrent access scenarios The fix is minimal and safe: - The ConcurrentLinkedQueue.remove() operation is safe even if the element doesn't exist - The synchronization block is very short, minimizing performance impact - The approach ensures no watcher connections are leaked, regardless of how they were closed ### Related observations: During the investigation, I noticed that broadcastToWatchers() doesn't remove watchers when IOException occurs. This could cause performance degradation as closed connections would repeatedly fail. However, this is a separate issue and should be addressed in a different PR to keep this fix focused. Closes #5001 from renechoi/ZEPPELIN-6260-fix-watcher-memory-leak. Signed-off-by: Philipp Dallig <philipp.dal...@gmail.com> --- .../apache/zeppelin/socket/ConnectionManager.java | 8 ++ .../org/apache/zeppelin/socket/NotebookServer.java | 1 + .../zeppelin/socket/ConnectionManagerTest.java | 111 +++++++++++++++++++++ 3 files changed, 120 insertions(+) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java index c4c0a3aa5d..a348d218af 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java @@ -175,6 +175,14 @@ public class ConnectionManager { } } + public void removeWatcherConnection(NotebookSocket conn) { + synchronized (watcherSockets) { + if (watcherSockets.remove(conn)) { + LOGGER.debug("Removed watcher connection: {}", conn); + } + } + } + public String getAssociatedNoteId(NotebookSocket socket) { String associatedNoteId = null; synchronized (noteSocketMap) { diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 57dade43c0..03637fbd2c 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -503,6 +503,7 @@ public class NotebookServer implements AngularObjectRegistryListener, private void removeConnection(NotebookSocket notebookSocket) { connectionManager.removeConnection(notebookSocket); + connectionManager.removeWatcherConnection(notebookSocket); connectionManager.removeConnectionFromAllNote(notebookSocket); connectionManager.removeUserConnection(notebookSocket.getUser(), notebookSocket); } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/ConnectionManagerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/ConnectionManagerTest.java index e574fd4e17..92adc93c79 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/ConnectionManagerTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/ConnectionManagerTest.java @@ -17,10 +17,21 @@ package org.apache.zeppelin.socket; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.notebook.AuthorizationService; +import org.apache.zeppelin.util.WatcherSecurityKey; import org.junit.jupiter.api.Test; class ConnectionManagerTest { @@ -57,4 +68,104 @@ class ConnectionManagerTest { manager.removeConnectionFromAllNote(socket); assertEquals(0, manager.noteSocketMap.size()); } + + @Test + void removeWatcherConnectionCleansQueue() { + AuthorizationService authService = mock(AuthorizationService.class); + + ConnectionManager manager = new ConnectionManager(authService, ZeppelinConfiguration.load()); + NotebookSocket socket = mock(NotebookSocket.class); + + manager.watcherSockets.add(socket); + assertEquals(1, manager.watcherSockets.size()); + + manager.removeWatcherConnection(socket); + assertEquals(0, manager.watcherSockets.size()); + } + + @Test + void removeWatcherConnectionWithMultipleWatchers() { + AuthorizationService authService = mock(AuthorizationService.class); + + ConnectionManager manager = new ConnectionManager(authService, ZeppelinConfiguration.load()); + NotebookSocket socket1 = mock(NotebookSocket.class); + NotebookSocket socket2 = mock(NotebookSocket.class); + NotebookSocket socket3 = mock(NotebookSocket.class); + + // Add multiple watchers + manager.watcherSockets.add(socket1); + manager.watcherSockets.add(socket2); + manager.watcherSockets.add(socket3); + assertEquals(3, manager.watcherSockets.size()); + + // Remove only socket2 + manager.removeWatcherConnection(socket2); + assertEquals(2, manager.watcherSockets.size()); + assertTrue(manager.watcherSockets.contains(socket1)); + assertFalse(manager.watcherSockets.contains(socket2)); + assertTrue(manager.watcherSockets.contains(socket3)); + } + + @Test + void removeWatcherConnectionConcurrentTest() throws InterruptedException { + AuthorizationService authService = mock(AuthorizationService.class); + ConnectionManager manager = new ConnectionManager(authService, ZeppelinConfiguration.load()); + + int threadCount = 10; + List<NotebookSocket> sockets = new ArrayList<>(); + + // Create and add multiple watcher sockets + for (int i = 0; i < threadCount; i++) { + NotebookSocket socket = mock(NotebookSocket.class); + sockets.add(socket); + manager.watcherSockets.add(socket); + } + + assertEquals(threadCount, manager.watcherSockets.size()); + + // Remove sockets concurrently + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + CountDownLatch latch = new CountDownLatch(threadCount); + + for (NotebookSocket socket : sockets) { + executor.submit(() -> { + manager.removeWatcherConnection(socket); + latch.countDown(); + }); + } + + // Wait for all threads to complete + assertTrue(latch.await(5, TimeUnit.SECONDS)); + executor.shutdown(); + + // Verify all sockets were removed + assertEquals(0, manager.watcherSockets.size()); + } + + @Test + void switchConnectionToWatcherAndRemove() { + AuthorizationService authService = mock(AuthorizationService.class); + ConnectionManager manager = new ConnectionManager(authService, ZeppelinConfiguration.load()); + + NotebookSocket socket = mock(NotebookSocket.class); + when(socket.getUser()).thenReturn("testUser"); + when(socket.getHeader(WatcherSecurityKey.HTTP_HEADER)).thenReturn(WatcherSecurityKey.getKey()); + + // Add socket as regular connection first + manager.addConnection(socket); + manager.addUserConnection("testUser", socket); + + // Switch to watcher + manager.switchConnectionToWatcher(socket); + + // Verify it's in watcher queue + assertTrue(manager.watcherSockets.contains(socket)); + assertFalse(manager.connectedSockets.contains(socket)); + + // Remove watcher connection + manager.removeWatcherConnection(socket); + + // Verify it's completely removed + assertFalse(manager.watcherSockets.contains(socket)); + } }