Repository: zeppelin Updated Branches: refs/heads/master 001c621c7 -> e10332c93
ZEPPELIN-3735. Wrap WebSocket connection into ConnectionManager ### What is this PR for? This a refactoring PR which move all websocket connection related stuff into class `ConnectionManager` ### What type of PR is it? [Refactoring] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3735 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <[email protected]> Closes #3151 from zjffdu/ZEPPELIN-3735 and squashes the following commits: ca00ad803 [Jeff Zhang] ZEPPELIN-3735. Wrap WebSocket connection into ConnectionManager Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/e10332c9 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/e10332c9 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/e10332c9 Branch: refs/heads/master Commit: e10332c9366a4c712d3c063cc1323590c463ee05 Parents: 001c621 Author: Jeff Zhang <[email protected]> Authored: Mon Aug 20 14:35:07 2018 +0800 Committer: Jeff Zhang <[email protected]> Committed: Wed Aug 22 15:21:20 2018 +0800 ---------------------------------------------------------------------- .../zeppelin/socket/ConnectionManager.java | 442 ++++++++++++++++++ .../apache/zeppelin/socket/NotebookServer.java | 464 ++++--------------- .../zeppelin/socket/NotebookServerTest.java | 8 +- 3 files changed, 530 insertions(+), 384 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e10332c9/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java new file mode 100644 index 0000000..5d02d9f --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.socket; + + +import com.google.common.collect.Queues; +import com.google.common.collect.Sets; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.apache.commons.lang.StringUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.display.Input; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NotebookAuthorization; +import org.apache.zeppelin.notebook.NotebookImportDeserializer; +import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.notebook.socket.Message; +import org.apache.zeppelin.notebook.socket.WatcherMessage; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.apache.zeppelin.util.WatcherSecurityKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * Manager class for managing websocket connections + */ +public class ConnectionManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionManager.class); + private static Gson gson = new GsonBuilder() + .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ") + .registerTypeAdapter(Date.class, new NotebookImportDeserializer()) + .setPrettyPrinting() + .registerTypeAdapterFactory(Input.TypeAdapterFactory).create(); + + final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>(); + // noteId -> connection + final Map<String, List<NotebookSocket>> noteSocketMap = new ConcurrentHashMap<>(); + // user -> connection + final Map<String, Queue<NotebookSocket>> userSocketMap = new ConcurrentHashMap<>(); + + /** + * This is a special endpoint in the notebook websoket, Every connection in this Queue + * will be able to watch every websocket event, it doesnt need to be listed into the map of + * noteSocketMap. This can be used to get information about websocket traffic and watch what + * is going on. + */ + final Queue<NotebookSocket> watcherSockets = Queues.newConcurrentLinkedQueue(); + + private HashSet<String> collaborativeModeList = new HashSet<>(); + private Boolean collaborativeModeEnable = ZeppelinConfiguration + .create() + .isZeppelinNotebookCollaborativeModeEnable(); + + + public void addConnection(NotebookSocket conn) { + connectedSockets.add(conn); + } + + public void removeConnection(NotebookSocket conn) { + connectedSockets.remove(conn); + } + + public void addNoteConnection(String noteId, NotebookSocket socket) { + LOGGER.debug("Add connection {} to note: {}", socket, noteId); + synchronized (noteSocketMap) { + // make sure a socket relates only an single note. + removeConnectionFromAllNote(socket); + List<NotebookSocket> socketList = noteSocketMap.get(noteId); + if (socketList == null) { + socketList = new LinkedList<>(); + noteSocketMap.put(noteId, socketList); + } + if (!socketList.contains(socket)) { + socketList.add(socket); + } + checkCollaborativeStatus(noteId, socketList); + } + } + + public void removeNoteConnection(String noteId) { + synchronized (noteSocketMap) { + noteSocketMap.remove(noteId); + } + } + + public void removeNoteConnection(String noteId, NotebookSocket socket) { + LOGGER.debug("Remove connection {} from note: {}", socket, noteId); + synchronized (noteSocketMap) { + List<NotebookSocket> socketList = noteSocketMap.get(noteId); + if (socketList != null) { + socketList.remove(socket); + } + checkCollaborativeStatus(noteId, socketList); + } + } + + public void addUserConnection(String user, NotebookSocket conn) { + LOGGER.debug("Add user connection {} for user: {}", conn, user); + conn.setUser(user); + if (userSocketMap.containsKey(user)) { + userSocketMap.get(user).add(conn); + } else { + Queue<NotebookSocket> socketQueue = new ConcurrentLinkedQueue<>(); + socketQueue.add(conn); + userSocketMap.put(user, socketQueue); + } + } + + public void removeUserConnection(String user, NotebookSocket conn) { + LOGGER.debug("Remove user connection {} for user: {}", conn, user); + if (userSocketMap.containsKey(user)) { + userSocketMap.get(user).remove(conn); + } else { + LOGGER.warn("Closing connection that is absent in user connections"); + } + } + + public String getAssociatedNoteId(NotebookSocket socket) { + String associatedNoteId = null; + synchronized (noteSocketMap) { + Set<String> noteIds = noteSocketMap.keySet(); + for (String noteId : noteIds) { + List<NotebookSocket> sockets = noteSocketMap.get(noteId); + if (sockets.contains(socket)) { + associatedNoteId = noteId; + } + } + } + + return associatedNoteId; + } + + public void removeConnectionFromAllNote(NotebookSocket socket) { + synchronized (noteSocketMap) { + Set<String> noteIds = noteSocketMap.keySet(); + for (String noteId : noteIds) { + removeConnectionFromNote(noteId, socket); + } + } + } + + private void removeConnectionFromNote(String noteId, NotebookSocket socket) { + LOGGER.debug("Remove connection {} from note: {}", socket, noteId); + synchronized (noteSocketMap) { + List<NotebookSocket> socketList = noteSocketMap.get(noteId); + if (socketList != null) { + socketList.remove(socket); + } + checkCollaborativeStatus(noteId, socketList); + } + } + + private void checkCollaborativeStatus(String noteId, List<NotebookSocket> socketList) { + if (!collaborativeModeEnable) { + return; + } + boolean collaborativeStatusNew = socketList.size() > 1; + if (collaborativeStatusNew) { + collaborativeModeList.add(noteId); + } else { + collaborativeModeList.remove(noteId); + } + + Message message = new Message(Message.OP.COLLABORATIVE_MODE_STATUS); + message.put("status", collaborativeStatusNew); + if (collaborativeStatusNew) { + HashSet<String> userList = new HashSet<>(); + for (NotebookSocket noteSocket : socketList) { + userList.add(noteSocket.getUser()); + } + message.put("users", userList); + } + broadcast(noteId, message); + } + + + protected String serializeMessage(Message m) { + return gson.toJson(m); + } + + public void broadcast(Message m) { + synchronized (connectedSockets) { + for (NotebookSocket ns : connectedSockets) { + try { + ns.send(serializeMessage(m)); + } catch (IOException e) { + LOGGER.error("Send error: " + m, e); + } + } + } + } + + public void broadcast(String noteId, Message m) { + List<NotebookSocket> socketsToBroadcast = Collections.emptyList(); + synchronized (noteSocketMap) { + broadcastToWatchers(noteId, StringUtils.EMPTY, m); + List<NotebookSocket> socketLists = noteSocketMap.get(noteId); + if (socketLists == null || socketLists.size() == 0) { + return; + } + socketsToBroadcast = new ArrayList<>(socketLists); + } + LOGGER.debug("SEND >> " + m); + for (NotebookSocket conn : socketsToBroadcast) { + try { + conn.send(serializeMessage(m)); + } catch (IOException e) { + LOGGER.error("socket error", e); + } + } + } + + private void broadcastToWatchers(String noteId, String subject, Message message) { + synchronized (watcherSockets) { + for (NotebookSocket watcher : watcherSockets) { + try { + watcher.send( + WatcherMessage.builder(noteId).subject(subject).message(serializeMessage(message)) + .build().toJson()); + } catch (IOException e) { + LOGGER.error("Cannot broadcast message to watcher", e); + } + } + } + } + + public void broadcastExcept(String noteId, Message m, NotebookSocket exclude) { + List<NotebookSocket> socketsToBroadcast = Collections.emptyList(); + synchronized (noteSocketMap) { + broadcastToWatchers(noteId, StringUtils.EMPTY, m); + List<NotebookSocket> socketLists = noteSocketMap.get(noteId); + if (socketLists == null || socketLists.size() == 0) { + return; + } + socketsToBroadcast = new ArrayList<>(socketLists); + } + + LOGGER.debug("SEND >> " + m); + for (NotebookSocket conn : socketsToBroadcast) { + if (exclude.equals(conn)) { + continue; + } + try { + conn.send(serializeMessage(m)); + } catch (IOException e) { + LOGGER.error("socket error", e); + } + } + } + + /** + * Send websocket message to all connections regardless of notebook id. + */ + public void broadcastToAllConnections(String serialized) { + broadcastToAllConnectionsExcept(null, serialized); + } + + public void broadcastToAllConnectionsExcept(NotebookSocket exclude, String serializedMsg) { + synchronized (connectedSockets) { + for (NotebookSocket conn : connectedSockets) { + if (exclude != null && exclude.equals(conn)) { + continue; + } + + try { + conn.send(serializedMsg); + } catch (IOException e) { + LOGGER.error("Cannot broadcast message to conn", e); + } + } + } + } + + public Set<String> getConnectedUsers() { + Set<String> connectedUsers = Sets.newHashSet(); + for (NotebookSocket notebookSocket : connectedSockets) { + connectedUsers.add(notebookSocket.getUser()); + } + return connectedUsers; + } + + + public void multicastToUser(String user, Message m) { + if (!userSocketMap.containsKey(user)) { + LOGGER.warn("Multicasting to user {} that is not in connections map", user); + return; + } + + for (NotebookSocket conn : userSocketMap.get(user)) { + unicast(m, conn); + } + } + + public void unicast(Message m, NotebookSocket conn) { + try { + conn.send(serializeMessage(m)); + } catch (IOException e) { + LOGGER.error("socket error", e); + } + broadcastToWatchers(StringUtils.EMPTY, StringUtils.EMPTY, m); + } + + public void unicastParagraph(Note note, Paragraph p, String user) { + if (!note.isPersonalizedMode() || p == null || user == null) { + return; + } + + if (!userSocketMap.containsKey(user)) { + LOGGER.warn("Failed to send unicast. user {} that is not in connections map", user); + return; + } + + for (NotebookSocket conn : userSocketMap.get(user)) { + Message m = new Message(Message.OP.PARAGRAPH).put("paragraph", p); + unicast(m, conn); + } + } + + public void broadcastNoteListExcept(List<Map<String, String>> notesInfo, + AuthenticationInfo subject) { + Set<String> userAndRoles; + NotebookAuthorization authInfo = NotebookAuthorization.getInstance(); + for (String user : userSocketMap.keySet()) { + if (subject.getUser().equals(user)) { + continue; + } + //reloaded already above; parameter - false + userAndRoles = authInfo.getRoles(user); + userAndRoles.add(user); + // TODO(zjffdu) is it ok for comment the following line ? + // notesInfo = generateNotesInfo(false, new AuthenticationInfo(user), userAndRoles); + multicastToUser(user, new Message(Message.OP.NOTES_INFO).put("notes", notesInfo)); + } + } + + public void broadcastNote(Note note) { + broadcast(note.getId(), new Message(Message.OP.NOTE).put("note", note)); + } + + public void broadcastParagraph(Note note, Paragraph p) { + broadcastNoteForms(note); + + if (note.isPersonalizedMode()) { + broadcastParagraphs(p.getUserParagraphMap(), p); + } else { + broadcast(note.getId(), new Message(Message.OP.PARAGRAPH).put("paragraph", p)); + } + } + + public void broadcastParagraphs(Map<String, Paragraph> userParagraphMap, + Paragraph defaultParagraph) { + if (null != userParagraphMap) { + for (String user : userParagraphMap.keySet()) { + multicastToUser(user, + new Message(Message.OP.PARAGRAPH).put("paragraph", userParagraphMap.get(user))); + } + } + } + + private void broadcastNewParagraph(Note note, Paragraph para) { + LOGGER.info("Broadcasting paragraph on run call instead of note."); + int paraIndex = note.getParagraphs().indexOf(para); + broadcast(note.getId(), + new Message(Message.OP.PARAGRAPH_ADDED).put("paragraph", para).put("index", paraIndex)); + } + + // public void broadcastNoteList(AuthenticationInfo subject, Set<String> userAndRoles) { + // if (subject == null) { + // subject = new AuthenticationInfo(StringUtils.EMPTY); + // } + // //send first to requesting user + // List<Map<String, String>> notesInfo = generateNotesInfo(false, subject, userAndRoles); + // multicastToUser(subject.getUser(), new Message(Message.OP.NOTES_INFO) + // .put("notes", notesInfo)); + // //to others afterwards + // broadcastNoteListExcept(notesInfo, subject); + // } + + + private void broadcastNoteForms(Note note) { + GUI formsSettings = new GUI(); + formsSettings.setForms(note.getNoteForms()); + formsSettings.setParams(note.getNoteParams()); + broadcast(note.getId(), new Message(Message.OP.SAVE_NOTE_FORMS) + .put("formsData", formsSettings)); + } + + public void switchConnectionToWatcher(NotebookSocket conn) { + if (!isSessionAllowedToSwitchToWatcher(conn)) { + LOGGER.error("Cannot switch this client to watcher, invalid security key"); + return; + } + LOGGER.info("Going to add {} to watcher socket", conn); + // add the connection to the watcher. + if (watcherSockets.contains(conn)) { + LOGGER.info("connection alrerady present in the watcher"); + return; + } + watcherSockets.add(conn); + + // remove this connection from regular zeppelin ws usage. + removeConnection(conn); + removeConnectionFromAllNote(conn); + removeUserConnection(conn.getUser(), conn); + } + + private boolean isSessionAllowedToSwitchToWatcher(NotebookSocket session) { + String watcherSecurityKey = session.getRequest().getHeader(WatcherSecurityKey.HTTP_HEADER); + return !(StringUtils.isBlank(watcherSecurityKey) || !watcherSecurityKey + .equals(WatcherSecurityKey.getKey())); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e10332c9/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 16719f3..a3fce8f 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -17,8 +17,6 @@ package org.apache.zeppelin.socket; import com.google.common.base.Strings; -import com.google.common.collect.Queues; -import com.google.common.collect.Sets; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; @@ -52,7 +50,6 @@ import org.apache.zeppelin.notebook.ParagraphWithRuntimeInfo; import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision; import org.apache.zeppelin.notebook.socket.Message; import org.apache.zeppelin.notebook.socket.Message.OP; -import org.apache.zeppelin.notebook.socket.WatcherMessage; import org.apache.zeppelin.rest.exception.ForbiddenException; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.server.ZeppelinServer; @@ -64,7 +61,6 @@ import org.apache.zeppelin.service.SimpleServiceCallback; import org.apache.zeppelin.ticket.TicketContainer; import org.apache.zeppelin.types.InterpreterSettingsList; import org.apache.zeppelin.user.AuthenticationInfo; -import org.apache.zeppelin.util.WatcherSecurityKey; import org.apache.zeppelin.utils.InterpreterBindingUtils; import org.apache.zeppelin.utils.SecurityUtils; import org.bitbucket.cowwoc.diffmatchpatch.DiffMatchPatch; @@ -86,17 +82,13 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.regex.Matcher; @@ -130,7 +122,7 @@ public class NotebookServer extends WebSocketServlet } - private HashSet<String> collaborativeModeList = new HashSet<>(); + // private HashSet<String> collaborativeModeList = new HashSet<>(); private Boolean collaborativeModeEnable = ZeppelinConfiguration .create() .isZeppelinNotebookCollaborativeModeEnable(); @@ -141,23 +133,17 @@ public class NotebookServer extends WebSocketServlet .setPrettyPrinting() .registerTypeAdapterFactory(Input.TypeAdapterFactory).create(); - final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>(); - final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>(); - final Map<String, Queue<NotebookSocket>> userConnectedSockets = new ConcurrentHashMap<>(); - + private ConnectionManager connectionManager; private NotebookService notebookService; private ConfigurationService configurationService; private JobManagerService jobManagerService; private ExecutorService executorService = Executors.newFixedThreadPool(10); - /** - * This is a special endpoint in the notebook websoket, Every connection in this Queue - * will be able to watch every websocket event, it doesnt need to be listed into the map of - * noteSocketMap. This can be used to get information about websocket traffic and watch what - * is going on. - */ - final Queue<NotebookSocket> watcherSockets = Queues.newConcurrentLinkedQueue(); + + public NotebookServer() { + this.connectionManager = new ConnectionManager(); + } private Notebook notebook() { return ZeppelinServer.notebook; @@ -198,14 +184,10 @@ public class NotebookServer extends WebSocketServlet return false; } - public NotebookSocket doWebSocketConnect(HttpServletRequest req, String protocol) { - return new NotebookSocket(req, protocol, this); - } - @Override public void onOpen(NotebookSocket conn) { LOG.info("New connection from {}", conn); - connectedSockets.add(conn); + connectionManager.addConnection(conn); } @Override @@ -258,7 +240,7 @@ public class NotebookServer extends WebSocketServlet } } if (StringUtils.isEmpty(conn.getUser())) { - addUserConnection(messagereceived.principal, conn); + connectionManager.addUserConnection(messagereceived.principal, conn); } AuthenticationInfo subject = new AuthenticationInfo(messagereceived.principal, messagereceived.roles, @@ -404,7 +386,7 @@ public class NotebookServer extends WebSocketServlet getInterpreterSettings(conn); break; case WATCHER: - switchConnectionToWatcher(conn, messagereceived); + connectionManager.switchConnectionToWatcher(conn); break; case SAVE_NOTE_FORMS: saveNoteForms(conn, messagereceived); @@ -426,30 +408,13 @@ public class NotebookServer extends WebSocketServlet @Override public void onClose(NotebookSocket conn, int code, String reason) { LOG.info("Closed connection to {} ({}) {}", conn, code, reason); - removeConnectionFromAllNote(conn); - connectedSockets.remove(conn); - removeUserConnection(conn.getUser(), conn); - } - - private void removeUserConnection(String user, NotebookSocket conn) { - LOG.debug("Remove user connection {} for user: {}", conn, user); - if (userConnectedSockets.containsKey(user)) { - userConnectedSockets.get(user).remove(conn); - } else { - LOG.warn("Closing connection that is absent in user connections"); - } + connectionManager.removeConnection(conn); + connectionManager.removeConnectionFromAllNote(conn); + connectionManager.removeUserConnection(conn.getUser(), conn); } - private void addUserConnection(String user, NotebookSocket conn) { - LOG.debug("Add user connection {} for user: {}", conn, user); - conn.setUser(user); - if (userConnectedSockets.containsKey(user)) { - userConnectedSockets.get(user).add(conn); - } else { - Queue<NotebookSocket> socketQueue = new ConcurrentLinkedQueue<>(); - socketQueue.add(conn); - userConnectedSockets.put(user, socketQueue); - } + public ConnectionManager getConnectionManager() { + return connectionManager; } protected Message deserializeMessage(String msg) { @@ -460,165 +425,13 @@ public class NotebookServer extends WebSocketServlet return gson.toJson(m); } - private void addConnectionToNote(String noteId, NotebookSocket socket) { - LOG.debug("Add connection {} to note: {}", socket, noteId); - synchronized (noteSocketMap) { - removeConnectionFromAllNote(socket); // make sure a socket relates only a - // single note. - List<NotebookSocket> socketList = noteSocketMap.get(noteId); - if (socketList == null) { - socketList = new LinkedList<>(); - noteSocketMap.put(noteId, socketList); - } - if (!socketList.contains(socket)) { - socketList.add(socket); - } - checkCollaborativeStatus(noteId, socketList); - } - } - - private void removeConnectionFromNote(String noteId, NotebookSocket socket) { - LOG.debug("Remove connection {} from note: {}", socket, noteId); - synchronized (noteSocketMap) { - List<NotebookSocket> socketList = noteSocketMap.get(noteId); - if (socketList != null) { - socketList.remove(socket); - } - checkCollaborativeStatus(noteId, socketList); - } - } - - private void removeConn(String noteId) { - synchronized (noteSocketMap) { - List<NotebookSocket> socketList = noteSocketMap.remove(noteId); - } - } - - private void removeConnectionFromAllNote(NotebookSocket socket) { - synchronized (noteSocketMap) { - Set<String> keys = noteSocketMap.keySet(); - for (String noteId : keys) { - removeConnectionFromNote(noteId, socket); - } - } - } - - private void checkCollaborativeStatus(String noteId, List<NotebookSocket> socketList) { - if (!collaborativeModeEnable) { - return; - } - boolean collaborativeStatusNew = socketList.size() > 1; - if (collaborativeStatusNew) { - collaborativeModeList.add(noteId); - } else { - collaborativeModeList.remove(noteId); - } - - Message message = new Message(OP.COLLABORATIVE_MODE_STATUS); - message.put("status", collaborativeStatusNew); - if (collaborativeStatusNew) { - HashSet<String> userList = new HashSet<>(); - for (NotebookSocket noteSocket : socketList) { - userList.add(noteSocket.getUser()); - } - message.put("users", userList); - } - broadcast(noteId, message); - } - - private String getOpenNoteId(NotebookSocket socket) { - String id = null; - synchronized (noteSocketMap) { - Set<String> keys = noteSocketMap.keySet(); - for (String noteId : keys) { - List<NotebookSocket> sockets = noteSocketMap.get(noteId); - if (sockets.contains(socket)) { - id = noteId; - } - } - } - - return id; - } - public void broadcast(Message m) { - synchronized (connectedSockets) { - for (NotebookSocket ns : connectedSockets) { - try { - ns.send(serializeMessage(m)); - } catch (IOException e) { - LOG.error("Send error: " + m, e); - } - } - } - } - - private void broadcast(String noteId, Message m) { - List<NotebookSocket> socketsToBroadcast = Collections.emptyList(); - synchronized (noteSocketMap) { - broadcastToWatchers(noteId, StringUtils.EMPTY, m); - List<NotebookSocket> socketLists = noteSocketMap.get(noteId); - if (socketLists == null || socketLists.size() == 0) { - return; - } - socketsToBroadcast = new ArrayList<>(socketLists); - } - LOG.debug("SEND >> " + m); - for (NotebookSocket conn : socketsToBroadcast) { - try { - conn.send(serializeMessage(m)); - } catch (IOException e) { - LOG.error("socket error", e); - } - } - } - - private void broadcastExcept(String noteId, Message m, NotebookSocket exclude) { - List<NotebookSocket> socketsToBroadcast = Collections.emptyList(); - synchronized (noteSocketMap) { - broadcastToWatchers(noteId, StringUtils.EMPTY, m); - List<NotebookSocket> socketLists = noteSocketMap.get(noteId); - if (socketLists == null || socketLists.size() == 0) { - return; - } - socketsToBroadcast = new ArrayList<>(socketLists); - } - - LOG.debug("SEND >> " + m); - for (NotebookSocket conn : socketsToBroadcast) { - if (exclude.equals(conn)) { - continue; - } - try { - conn.send(serializeMessage(m)); - } catch (IOException e) { - LOG.error("socket error", e); - } - } - } - - private void multicastToUser(String user, Message m) { - if (!userConnectedSockets.containsKey(user)) { - LOG.warn("Multicasting to user {} that is not in connections map", user); - return; - } - - for (NotebookSocket conn : userConnectedSockets.get(user)) { - unicast(m, conn); - } - } - - private void unicast(Message m, NotebookSocket conn) { - try { - conn.send(serializeMessage(m)); - } catch (IOException e) { - LOG.error("socket error", e); - } - broadcastToWatchers(StringUtils.EMPTY, StringUtils.EMPTY, m); + connectionManager.broadcast(m); } public void unicastNoteJobInfo(NotebookSocket conn, Message fromMessage) throws IOException { - addConnectionToNote(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn); + + connectionManager.addNoteConnection(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn); getJobManagerService().getNoteJobInfoByUnixTime(0, getServiceContext(fromMessage), new WebSocketServiceCallback<List<JobManagerService.NoteJobInfo>>(conn) { @Override @@ -648,7 +461,7 @@ public class NotebookServer extends WebSocketServlet Map<String, Object> response = new HashMap<>(); response.put("lastResponseUnixTime", System.currentTimeMillis()); response.put("jobs", notesJobInfo); - broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), + connectionManager.broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response)); } @@ -660,7 +473,7 @@ public class NotebookServer extends WebSocketServlet } public void unsubscribeNoteJobInfo(NotebookSocket conn) { - removeConnectionFromNote(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn); + connectionManager.removeNoteConnection(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn); } public void getInterpreterBindings(NotebookSocket conn, Message fromMessage) throws IOException { @@ -707,23 +520,7 @@ public class NotebookServer extends WebSocketServlet } public void broadcastNote(Note note) { - broadcast(note.getId(), new Message(OP.NOTE).put("note", note)); - } - - public void unicastParagraph(Note note, Paragraph p, String user) { - if (!note.isPersonalizedMode() || p == null || user == null) { - return; - } - - if (!userConnectedSockets.containsKey(user)) { - LOG.warn("Failed to send unicast. user {} that is not in connections map", user); - return; - } - - for (NotebookSocket conn : userConnectedSockets.get(user)) { - Message m = new Message(OP.PARAGRAPH).put("paragraph", p); - unicast(m, conn); - } + connectionManager.broadcast(note.getId(), new Message(OP.NOTE).put("note", note)); } public void broadcastParagraph(Note note, Paragraph p) { @@ -732,8 +529,8 @@ public class NotebookServer extends WebSocketServlet if (note.isPersonalizedMode()) { broadcastParagraphs(p.getUserParagraphMap(), p); } else { - broadcast(note.getId(), new Message(OP.PARAGRAPH).put("paragraph", - new ParagraphWithRuntimeInfo(p))); + connectionManager.broadcast(note.getId(), + new Message(OP.PARAGRAPH).put("paragraph", new ParagraphWithRuntimeInfo(p))); } } @@ -741,7 +538,7 @@ public class NotebookServer extends WebSocketServlet Paragraph defaultParagraph) { if (null != userParagraphMap) { for (String user : userParagraphMap.keySet()) { - multicastToUser(user, + connectionManager.multicastToUser(user, new Message(OP.PARAGRAPH).put("paragraph", userParagraphMap.get(user))); } } @@ -750,7 +547,7 @@ public class NotebookServer extends WebSocketServlet private void broadcastNewParagraph(Note note, Paragraph para) { LOG.info("Broadcasting paragraph on run call instead of note."); int paraIndex = note.getParagraphs().indexOf(para); - broadcast(note.getId(), + connectionManager.broadcast(note.getId(), new Message(OP.PARAGRAPH_ADDED).put("paragraph", para).put("index", paraIndex)); } @@ -760,9 +557,10 @@ public class NotebookServer extends WebSocketServlet } //send first to requesting user List<Map<String, String>> notesInfo = generateNotesInfo(false, subject, userAndRoles); - multicastToUser(subject.getUser(), new Message(OP.NOTES_INFO).put("notes", notesInfo)); + connectionManager.multicastToUser(subject.getUser(), + new Message(OP.NOTES_INFO).put("notes", notesInfo)); //to others afterwards - broadcastNoteListExcept(notesInfo, subject); + connectionManager.broadcastNoteListExcept(notesInfo, subject); } public void listNotes(NotebookSocket conn, Message message) throws IOException { @@ -772,7 +570,7 @@ public class NotebookServer extends WebSocketServlet public void onSuccess(List<Map<String, String>> notesInfo, ServiceContext context) throws IOException { super.onSuccess(notesInfo, context); - unicast(new Message(OP.NOTES_INFO).put("notes", notesInfo), conn); + connectionManager.unicast(new Message(OP.NOTES_INFO).put("notes", notesInfo), conn); } }); } @@ -785,30 +583,14 @@ public class NotebookServer extends WebSocketServlet public void onSuccess(List<Map<String, String>> notesInfo, ServiceContext context) throws IOException { super.onSuccess(notesInfo, context); - multicastToUser(context.getAutheInfo().getUser(), + connectionManager.multicastToUser(context.getAutheInfo().getUser(), new Message(OP.NOTES_INFO).put("notes", notesInfo)); //to others afterwards - broadcastNoteListExcept(notesInfo, context.getAutheInfo()); + connectionManager.broadcastNoteListExcept(notesInfo, context.getAutheInfo()); } }); } - private void broadcastNoteListExcept(List<Map<String, String>> notesInfo, - AuthenticationInfo subject) { - Set<String> userAndRoles; - NotebookAuthorization authInfo = NotebookAuthorization.getInstance(); - for (String user : userConnectedSockets.keySet()) { - if (subject.getUser().equals(user)) { - continue; - } - //reloaded already above; parameter - false - userAndRoles = authInfo.getRoles(user); - userAndRoles.add(user); - notesInfo = generateNotesInfo(false, new AuthenticationInfo(user), userAndRoles); - multicastToUser(user, new Message(OP.NOTES_INFO).put("notes", notesInfo)); - } - } - void permissionError(NotebookSocket conn, String op, String userName, Set<String> userAndRoles, Set<String> allowed) throws IOException { LOG.info("Cannot {}. Connection readers {}. Allowed readers {}", op, userAndRoles, allowed); @@ -821,23 +603,6 @@ public class NotebookServer extends WebSocketServlet /** - * @return false if user doesn't have runner permission for this paragraph - */ - private boolean hasParagraphRunnerPermission(NotebookSocket conn, Notebook notebook, - String noteId, HashSet<String> userAndRoles, - String principal, String op) - throws IOException { - NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); - if (!notebookAuthorization.isRunner(noteId, userAndRoles)) { - permissionError(conn, op, principal, userAndRoles, - notebookAuthorization.getOwners(noteId)); - return false; - } - - return true; - } - - /** * @return false if user doesn't have writer permission for this paragraph */ private boolean hasParagraphWriterPermission(NotebookSocket conn, Notebook notebook, @@ -880,7 +645,7 @@ public class NotebookServer extends WebSocketServlet new WebSocketServiceCallback<Note>(conn) { @Override public void onSuccess(Note note, ServiceContext context) throws IOException { - addConnectionToNote(note.getId(), conn); + connectionManager.addNoteConnection(note.getId(), conn); conn.send(serializeMessage(new Message(OP.NOTE).put("note", note))); sendAllAngularObjects(note, context.getAutheInfo().getUser(), conn); } @@ -896,11 +661,11 @@ public class NotebookServer extends WebSocketServlet public void onSuccess(Note note, ServiceContext context) throws IOException { super.onSuccess(note, context); if (note != null) { - addConnectionToNote(note.getId(), conn); + connectionManager.addNoteConnection(note.getId(), conn); conn.send(serializeMessage(new Message(OP.NOTE).put("note", note))); sendAllAngularObjects(note, context.getAutheInfo().getUser(), conn); } else { - removeConnectionFromAllNote(conn); + connectionManager.removeConnectionFromAllNote(conn); conn.send(serializeMessage(new Message(OP.NOTE).put("note", null))); } } @@ -923,7 +688,7 @@ public class NotebookServer extends WebSocketServlet new WebSocketServiceCallback<Note>(conn) { @Override public void onSuccess(Note note, ServiceContext context) throws IOException { - broadcast(note.getId(), new Message(OP.NOTE_UPDATED).put("name", name) + connectionManager.broadcast(note.getId(), new Message(OP.NOTE_UPDATED).put("name", name) .put("config", config) .put("info", note.getInfo())); broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); @@ -1021,7 +786,7 @@ public class NotebookServer extends WebSocketServlet @Override public void onSuccess(Note note, ServiceContext context) throws IOException { super.onSuccess(note, context); - addConnectionToNote(note.getId(), conn); + connectionManager.addNoteConnection(note.getId(), conn); conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", note))); broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); } @@ -1043,7 +808,7 @@ public class NotebookServer extends WebSocketServlet @Override public void onSuccess(String message, ServiceContext context) throws IOException { super.onSuccess(message, context); - removeConn(noteId); + connectionManager.removeNoteConnection(noteId); broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); } }); @@ -1069,7 +834,7 @@ public class NotebookServer extends WebSocketServlet AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); for (Note note : notes) { notebook.removeNote(note.getId(), subject); - removeConn(note.getId()); + connectionManager.removeNoteConnection(note.getId()); } broadcastNoteList(subject, userAndRoles); } @@ -1186,7 +951,7 @@ public class NotebookServer extends WebSocketServlet private void updateParagraph(NotebookSocket conn, Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); - String noteId = getOpenNoteId(conn); + String noteId = connectionManager.getAssociatedNoteId(conn); if (noteId == null) { noteId = (String) fromMessage.get("noteId"); } @@ -1222,7 +987,7 @@ public class NotebookServer extends WebSocketServlet return; } - String noteId = getOpenNoteId(conn); + String noteId = connectionManager.getAssociatedNoteId(conn); if (noteId == null) { noteId = fromMessage.getType("noteId", LOG); if (noteId == null) { @@ -1265,19 +1030,19 @@ public class NotebookServer extends WebSocketServlet p.setText(paragraphText); Message message = new Message(OP.PATCH_PARAGRAPH).put("patch", patchText) .put("paragraphId", p.getId()); - broadcastExcept(note.getId(), message, conn); + connectionManager.broadcastExcept(note.getId(), message, conn); } private void cloneNote(NotebookSocket conn, Message fromMessage) throws IOException { - String noteId = getOpenNoteId(conn); + String noteId = connectionManager.getAssociatedNoteId(conn); String name = (String) fromMessage.get("name"); getNotebookService().cloneNote(noteId, name, getServiceContext(fromMessage), new WebSocketServiceCallback<Note>(conn) { @Override public void onSuccess(Note newNote, ServiceContext context) throws IOException { super.onSuccess(newNote, context); - addConnectionToNote(newNote.getId(), conn); + connectionManager.addNoteConnection(newNote.getId(), conn); conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", newNote))); broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); } @@ -1321,13 +1086,13 @@ public class NotebookServer extends WebSocketServlet private void removeParagraph(NotebookSocket conn, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); - String noteId = getOpenNoteId(conn); + String noteId = connectionManager.getAssociatedNoteId(conn); getNotebookService().removeParagraph(noteId, paragraphId, getServiceContext(fromMessage), new WebSocketServiceCallback<Paragraph>(conn){ @Override public void onSuccess(Paragraph p, ServiceContext context) throws IOException { super.onSuccess(p, context); - broadcast(p.getNote().getId(), new Message(OP.PARAGRAPH_REMOVED). + connectionManager.broadcast(p.getNote().getId(), new Message(OP.PARAGRAPH_REMOVED). put("id", p.getId())); } }); @@ -1336,14 +1101,14 @@ public class NotebookServer extends WebSocketServlet private void clearParagraphOutput(NotebookSocket conn, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); - String noteId = getOpenNoteId(conn); + String noteId = connectionManager.getAssociatedNoteId(conn); getNotebookService().clearParagraphOutput(noteId, paragraphId, getServiceContext(fromMessage), new WebSocketServiceCallback<Paragraph>(conn) { @Override public void onSuccess(Paragraph p, ServiceContext context) throws IOException { super.onSuccess(p, context); if (p.getNote().isPersonalizedMode()) { - unicastParagraph(p.getNote(), p, context.getAutheInfo().getUser()); + connectionManager.unicastParagraph(p.getNote(), p, context.getAutheInfo().getUser()); } else { broadcastParagraph(p.getNote(), p); } @@ -1353,7 +1118,7 @@ public class NotebookServer extends WebSocketServlet private void completion(NotebookSocket conn, Message fromMessage) throws IOException { - String noteId = getOpenNoteId(conn); + String noteId = connectionManager.getAssociatedNoteId(conn); String paragraphId = (String) fromMessage.get("id"); String buffer = (String) fromMessage.get("buf"); int cursor = (int) Double.parseDouble(fromMessage.get("cursor").toString()); @@ -1452,7 +1217,7 @@ public class NotebookServer extends WebSocketServlet .getId())) { AngularObjectRegistry angularObjectRegistry = setting.getInterpreterGroup(user, n.getId()).getAngularObjectRegistry(); - this.broadcastExcept(n.getId(), + connectionManager.broadcastExcept(n.getId(), new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) .put("interpreterGroupId", interpreterGroupId).put("noteId", n.getId()) .put("paragraphId", ao.getParagraphId()), conn); @@ -1460,7 +1225,7 @@ public class NotebookServer extends WebSocketServlet } } } else { // broadcast to all web session for the note - this.broadcastExcept(note.getId(), + connectionManager.broadcastExcept(note.getId(), new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) .put("interpreterGroupId", interpreterGroupId).put("noteId", note.getId()) .put("paragraphId", ao.getParagraphId()), conn); @@ -1551,7 +1316,8 @@ public class NotebookServer extends WebSocketServlet final AngularObject ao = remoteRegistry.addAndNotifyRemoteProcess(varName, varValue, noteId, paragraphId); - this.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) + connectionManager.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", ao) .put("interpreterGroupId", interpreterGroupId).put("noteId", noteId) .put("paragraphId", paragraphId), conn); } @@ -1562,7 +1328,8 @@ public class NotebookServer extends WebSocketServlet NotebookSocket conn) { final AngularObject ao = remoteRegistry.removeAndNotifyRemoteProcess(varName, noteId, paragraphId); - this.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_REMOVE).put("angularObject", ao) + connectionManager.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_REMOVE) + .put("angularObject", ao) .put("interpreterGroupId", interpreterGroupId).put("noteId", noteId) .put("paragraphId", paragraphId), conn); } @@ -1578,7 +1345,7 @@ public class NotebookServer extends WebSocketServlet angularObject.set(varValue, true); } - this.broadcastExcept(noteId, + connectionManager.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", angularObject) .put("interpreterGroupId", interpreterGroupId).put("noteId", noteId) .put("paragraphId", paragraphId), conn); @@ -1589,7 +1356,7 @@ public class NotebookServer extends WebSocketServlet String interpreterGroupId, NotebookSocket conn) { final AngularObject removed = registry.remove(varName, noteId, paragraphId); if (removed != null) { - this.broadcastExcept(noteId, + connectionManager.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_REMOVE).put("angularObject", removed) .put("interpreterGroupId", interpreterGroupId).put("noteId", noteId) .put("paragraphId", paragraphId), conn); @@ -1600,14 +1367,14 @@ public class NotebookServer extends WebSocketServlet Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); final int newIndex = (int) Double.parseDouble(fromMessage.get("index").toString()); - String noteId = getOpenNoteId(conn); + String noteId = connectionManager.getAssociatedNoteId(conn); getNotebookService().moveParagraph(noteId, paragraphId, newIndex, getServiceContext(fromMessage), new WebSocketServiceCallback<Paragraph>(conn) { @Override public void onSuccess(Paragraph result, ServiceContext context) throws IOException { super.onSuccess(result, context); - broadcast(result.getNote().getId(), + connectionManager.broadcast(result.getNote().getId(), new Message(OP.PARAGRAPH_MOVED).put("id", paragraphId).put("index", newIndex)); } }); @@ -1616,7 +1383,7 @@ public class NotebookServer extends WebSocketServlet private String insertParagraph(NotebookSocket conn, Message fromMessage) throws IOException { final int index = (int) Double.parseDouble(fromMessage.get("index").toString()); - String noteId = getOpenNoteId(conn); + String noteId = connectionManager.getAssociatedNoteId(conn); Map<String, Object> config; if (fromMessage.get("config") != null) { config = (Map<String, Object>) fromMessage.get("config"); @@ -1651,7 +1418,7 @@ public class NotebookServer extends WebSocketServlet private void cancelParagraph(NotebookSocket conn, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); - String noteId = getOpenNoteId(conn); + String noteId = connectionManager.getAssociatedNoteId(conn); getNotebookService().cancelParagraph(noteId, paragraphId, getServiceContext(fromMessage), new WebSocketServiceCallback<>(conn)); } @@ -1675,7 +1442,7 @@ public class NotebookServer extends WebSocketServlet return; } - String noteId = getOpenNoteId(conn); + String noteId = connectionManager.getAssociatedNoteId(conn); if (!hasParagraphWriterPermission(conn, notebook, noteId, userAndRoles, fromMessage.principal, "write")) { @@ -1718,14 +1485,14 @@ public class NotebookServer extends WebSocketServlet } // broadcast to other clients only - broadcastExcept(note.getId(), + connectionManager.broadcastExcept(note.getId(), new Message(OP.RUN_PARAGRAPH_USING_SPELL).put("paragraph", p), conn); } private void runParagraph(NotebookSocket conn, Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); - String noteId = getOpenNoteId(conn); + String noteId = connectionManager.getAssociatedNoteId(conn); String text = (String) fromMessage.get("paragraph"); String title = (String) fromMessage.get("title"); Map<String, Object> params = (Map<String, Object>) fromMessage.get("params"); @@ -1739,7 +1506,7 @@ public class NotebookServer extends WebSocketServlet if (p.getNote().isPersonalizedMode()) { Paragraph p2 = p.getNote().clearPersonalizedParagraphOutput(paragraphId, context.getAutheInfo().getUser()); - unicastParagraph(p.getNote(), p2, context.getAutheInfo().getUser()); + connectionManager.unicastParagraph(p.getNote(), p2, context.getAutheInfo().getUser()); } // if it's the last paragraph and not empty, let's add a new one @@ -1922,7 +1689,7 @@ public class NotebookServer extends WebSocketServlet public void onOutputAppend(String noteId, String paragraphId, int index, String output) { Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT).put("noteId", noteId) .put("paragraphId", paragraphId).put("index", index).put("data", output); - broadcast(noteId, msg); + connectionManager.broadcast(noteId, msg); } /** @@ -1939,10 +1706,10 @@ public class NotebookServer extends WebSocketServlet if (note.isPersonalizedMode()) { String user = note.getParagraph(paragraphId).getUser(); if (null != user) { - multicastToUser(user, msg); + connectionManager.multicastToUser(user, msg); } } else { - broadcast(noteId, msg); + connectionManager.broadcast(noteId, msg); } } @@ -1967,7 +1734,7 @@ public class NotebookServer extends WebSocketServlet Message msg = new Message(OP.APP_APPEND_OUTPUT).put("noteId", noteId).put("paragraphId", paragraphId) .put("index", index).put("appId", appId).put("data", output); - broadcast(noteId, msg); + connectionManager.broadcast(noteId, msg); } /** @@ -1979,14 +1746,14 @@ public class NotebookServer extends WebSocketServlet Message msg = new Message(OP.APP_UPDATE_OUTPUT).put("noteId", noteId).put("paragraphId", paragraphId) .put("index", index).put("type", type).put("appId", appId).put("data", output); - broadcast(noteId, msg); + connectionManager.broadcast(noteId, msg); } @Override public void onLoad(String noteId, String paragraphId, String appId, HeliumPackage pkg) { Message msg = new Message(OP.APP_LOAD).put("noteId", noteId).put("paragraphId", paragraphId) .put("appId", appId).put("pkg", pkg); - broadcast(noteId, msg); + connectionManager.broadcast(noteId, msg); } @Override @@ -1994,7 +1761,7 @@ public class NotebookServer extends WebSocketServlet Message msg = new Message(OP.APP_STATUS_CHANGE).put("noteId", noteId).put("paragraphId", paragraphId) .put("appId", appId).put("status", status); - broadcast(noteId, msg); + connectionManager.broadcast(noteId, msg); } @@ -2086,6 +1853,7 @@ public class NotebookServer extends WebSocketServlet } catch (IOException e) { LOG.warn("can not broadcast for job manager: " + e.getMessage(), e); } + } @Override @@ -2127,7 +1895,7 @@ public class NotebookServer extends WebSocketServlet Map<String, Object> response = new HashMap<>(); response.put("lastResponseUnixTime", System.currentTimeMillis()); response.put("jobs", notesJobInfo); - broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), + connectionManager.broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response)); } } @@ -2135,7 +1903,7 @@ public class NotebookServer extends WebSocketServlet @Override public void onProgressUpdate(Paragraph p, int progress) { - broadcast(p.getNote().getId(), + connectionManager.broadcast(p.getNote().getId(), new Message(OP.PROGRESS).put("id", p.getId()).put("progress", progress)); } @@ -2184,7 +1952,7 @@ public class NotebookServer extends WebSocketServlet Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT).put("noteId", paragraph.getNote().getId()) .put("paragraphId", paragraph.getId()).put("data", output); - broadcast(paragraph.getNote().getId(), msg); + connectionManager.broadcast(paragraph.getNote().getId(), msg); } /** @@ -2195,7 +1963,7 @@ public class NotebookServer extends WebSocketServlet Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT).put("noteId", paragraph.getNote().getId()) .put("paragraphId", paragraph.getId()).put("data", result.getData()); - broadcast(paragraph.getNote().getId(), msg); + connectionManager.broadcast(paragraph.getNote().getId(), msg); } @Override @@ -2257,7 +2025,8 @@ public class NotebookServer extends WebSocketServlet continue; } - broadcast(note.getId(), new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", object) + connectionManager.broadcast(note.getId(), new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", object) .put("interpreterGroupId", interpreterGroupId).put("noteId", note.getId()) .put("paragraphId", object.getParagraphId())); } @@ -2276,7 +2045,7 @@ public class NotebookServer extends WebSocketServlet notebook.getInterpreterSettingManager().getSettingIds(); for (String id : settingIds) { if (interpreterGroupId.contains(id)) { - broadcast(note.getId(), + connectionManager.broadcast(note.getId(), new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put("noteId", noteId) .put("paragraphId", paragraphId)); break; @@ -2288,7 +2057,7 @@ public class NotebookServer extends WebSocketServlet private void getEditorSetting(NotebookSocket conn, Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("paragraphId"); String replName = (String) fromMessage.get("magic"); - String noteId = getOpenNoteId(conn); + String noteId = connectionManager.getAssociatedNoteId(conn); getNotebookService().getEditorSetting(noteId, replName, getServiceContext(fromMessage), @@ -2317,68 +2086,6 @@ public class NotebookServer extends WebSocketServlet new Message(OP.INTERPRETER_SETTINGS).put("interpreterSettings", availableSettings))); } - private void switchConnectionToWatcher(NotebookSocket conn, Message messagereceived) { - if (!isSessionAllowedToSwitchToWatcher(conn)) { - LOG.error("Cannot switch this client to watcher, invalid security key"); - return; - } - LOG.info("Going to add {} to watcher socket", conn); - // add the connection to the watcher. - if (watcherSockets.contains(conn)) { - LOG.info("connection alrerady present in the watcher"); - return; - } - watcherSockets.add(conn); - - // remove this connection from regular zeppelin ws usage. - removeConnectionFromAllNote(conn); - connectedSockets.remove(conn); - removeUserConnection(conn.getUser(), conn); - } - - private boolean isSessionAllowedToSwitchToWatcher(NotebookSocket session) { - String watcherSecurityKey = session.getRequest().getHeader(WatcherSecurityKey.HTTP_HEADER); - return !(StringUtils.isBlank(watcherSecurityKey) || !watcherSecurityKey - .equals(WatcherSecurityKey.getKey())); - } - - /** - * Send websocket message to all connections regardless of notebook id. - */ - private void broadcastToAllConnections(String serialized) { - broadcastToAllConnectionsExcept(null, serialized); - } - - private void broadcastToAllConnectionsExcept(NotebookSocket exclude, String serialized) { - synchronized (connectedSockets) { - for (NotebookSocket conn : connectedSockets) { - if (exclude != null && exclude.equals(conn)) { - continue; - } - - try { - conn.send(serialized); - } catch (IOException e) { - LOG.error("Cannot broadcast message to watcher", e); - } - } - } - } - - private void broadcastToWatchers(String noteId, String subject, Message message) { - synchronized (watcherSockets) { - for (NotebookSocket watcher : watcherSockets) { - try { - watcher.send( - WatcherMessage.builder(noteId).subject(subject).message(serializeMessage(message)) - .build().toJson()); - } catch (IOException e) { - LOG.error("Cannot broadcast message to watcher", e); - } - } - } - } - @Override public void onParaInfosReceived(String noteId, String paragraphId, String interpreterSettingId, Map<String, String> metaInfos) { @@ -2397,7 +2104,7 @@ public class NotebookServer extends WebSocketServlet paragraph .updateRuntimeInfos(label, tooltip, metaInfos, setting.getGroup(), setting.getId()); - broadcast( + connectionManager.broadcast( note.getId(), new Message(OP.PARAS_INFO).put("id", paragraphId).put("infos", paragraph.getRuntimeInfos())); @@ -2409,7 +2116,8 @@ public class NotebookServer extends WebSocketServlet GUI formsSettings = new GUI(); formsSettings.setForms(note.getNoteForms()); formsSettings.setParams(note.getNoteParams()); - broadcast(note.getId(), new Message(OP.SAVE_NOTE_FORMS).put("formsData", formsSettings)); + connectionManager.broadcast(note.getId(), + new Message(OP.SAVE_NOTE_FORMS).put("formsData", formsSettings)); } private void saveNoteForms(NotebookSocket conn, @@ -2442,18 +2150,14 @@ public class NotebookServer extends WebSocketServlet @Override public Set<String> getConnectedUsers() { - Set<String> connectionList = Sets.newHashSet(); - for (NotebookSocket notebookSocket : connectedSockets) { - connectionList.add(notebookSocket.getUser()); - } - return connectionList; + return connectionManager.getConnectedUsers(); } @Override public void sendMessage(String message) { Message m = new Message(OP.NOTICE); m.data.put("notice", message); - broadcast(m); + connectionManager.broadcast(m); } private ServiceContext getServiceContext(Message message) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e10332c9/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java index abce8b6..03e7ee6 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java @@ -300,7 +300,7 @@ public class NotebookServerTest extends AbstractTestRestApi { .put("noteId", "noteId") .put("paragraphId", "paragraphId")); - server.noteSocketMap.put("noteId", asList(conn, otherConn)); + server.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn)); // When server.angularObjectClientBind(conn, new HashSet<String>(), notebook, messageReceived); @@ -349,7 +349,7 @@ public class NotebookServerTest extends AbstractTestRestApi { .put("noteId", "noteId") .put("paragraphId", "paragraphId")); - server.noteSocketMap.put("noteId", asList(conn, otherConn)); + server.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn)); // When server.angularObjectClientBind(conn, new HashSet<String>(), notebook, messageReceived); @@ -393,7 +393,7 @@ public class NotebookServerTest extends AbstractTestRestApi { .put("noteId", "noteId") .put("paragraphId", "paragraphId")); - server.noteSocketMap.put("noteId", asList(conn, otherConn)); + server.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn)); // When server.angularObjectClientUnbind(conn, new HashSet<String>(), notebook, messageReceived); @@ -440,7 +440,7 @@ public class NotebookServerTest extends AbstractTestRestApi { .put("interpreterGroupId", "mdGroup") .put("noteId", "noteId") .put("paragraphId", "paragraphId")); - server.noteSocketMap.put("noteId", asList(conn, otherConn)); + server.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn)); // When server.angularObjectClientUnbind(conn, new HashSet<String>(), notebook, messageReceived);
