change name and jump version
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/0e3a2728 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/0e3a2728 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/0e3a2728 Branch: refs/heads/devel-3 Commit: 0e3a2728aaab37fc1ce938efef33649899f166a3 Parents: a3e861e Author: David Yan <[email protected]> Authored: Tue Oct 15 20:00:51 2013 -0700 Committer: David Yan <[email protected]> Committed: Fri Aug 28 10:56:56 2015 -0700 ---------------------------------------------------------------------- PubSubWebSocketServlet.java | 284 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 284 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0e3a2728/PubSubWebSocketServlet.java ---------------------------------------------------------------------- diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java new file mode 100644 index 0000000..cd7a71f --- /dev/null +++ b/PubSubWebSocketServlet.java @@ -0,0 +1,284 @@ +/* + * Copyright (c) 2012-2013 DataTorrent, Inc. + * All Rights Reserved. + */ +package com.datatorrent.gateway; + +import com.datatorrent.api.util.JacksonObjectMapperProvider; +import com.datatorrent.api.util.PubSubMessage; +import com.datatorrent.api.util.PubSubMessage.PubSubMessageType; +import com.datatorrent.api.util.PubSubMessageCodec; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import javax.servlet.http.HttpServletRequest; +import org.codehaus.jackson.map.ObjectMapper; +import org.eclipse.jetty.websocket.WebSocket; +import org.eclipse.jetty.websocket.WebSocketServlet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p>PubSubWebSocketServlet class.</p> + * + * @author David Yan <[email protected]> + * @since 0.3.2 + */ +public class PubSubWebSocketServlet extends WebSocketServlet +{ + private static final Logger LOG = LoggerFactory.getLogger(PubSubWebSocketServlet.class); + private static final long serialVersionUID = 1L; + private HashMap<String, HashSet<PubSubWebSocket>> topicToSocketMap = new HashMap<String, HashSet<PubSubWebSocket>>(); + private HashMap<PubSubWebSocket, HashSet<String>> socketToTopicMap = new HashMap<PubSubWebSocket, HashSet<String>>(); + private ObjectMapper mapper = (new JacksonObjectMapperProvider()).getContext(null); + private PubSubMessageCodec<Object> codec = new PubSubMessageCodec<Object>(mapper); + private InternalMessageHandler internalMessageHandler = null; + + public interface InternalMessageHandler + { + void onMessage(String topic, Object data); + + } + + /* + private int timeout; + + public void setTimeout(int timeout) { + this.timeout = timeout; + } + */ + + /* + private int timeout; + public void setTimeout(int timeout) { + this.timeout = timeout; + } + */ + public void setInternalMessageHandler(InternalMessageHandler internalMessageHandler) + { + this.internalMessageHandler = internalMessageHandler; + } + + @Override + public WebSocket doWebSocketConnect(HttpServletRequest hsr, String protocol) + { + return new PubSubWebSocket(); + } + + private synchronized void subscribe(PubSubWebSocket webSocket, String topic) + { + HashSet<PubSubWebSocket> wsSet; + if (!topicToSocketMap.containsKey(topic)) { + wsSet = new HashSet<PubSubWebSocket>(); + topicToSocketMap.put(topic, wsSet); + } + else { + wsSet = topicToSocketMap.get(topic); + } + wsSet.add(webSocket); + + HashSet<String> topicSet; + if (!socketToTopicMap.containsKey(webSocket)) { + topicSet = new HashSet<String>(0); + socketToTopicMap.put(webSocket, topicSet); + } + else { + topicSet = socketToTopicMap.get(webSocket); + } + topicSet.add(topic); + publish(topic + ".numSubscribers", new Integer(getNumSubscribers(topic))); + } + + private synchronized void unsubscribe(PubSubWebSocket webSocket, String topic) + { + if (!topicToSocketMap.containsKey(topic)) { + return; + } + HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic); + wsSet.remove(webSocket); + if (wsSet.isEmpty()) { + topicToSocketMap.remove(topic); + } + if (!socketToTopicMap.containsKey(webSocket)) { + return; + } + HashSet<String> topicSet = socketToTopicMap.get(webSocket); + topicSet.remove(topic); + if (topicSet.isEmpty()) { + socketToTopicMap.remove(webSocket); + } + publish(topic + ".numSubscribers", new Integer(getNumSubscribers(topic))); + } + + private synchronized void unsubscribeAll(PubSubWebSocket webSocket) + { + HashSet<String> topicSet = socketToTopicMap.get(webSocket); + if (topicSet != null) { + for (String topic : topicSet) { + HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic); + wsSet.remove(webSocket); + if (wsSet.isEmpty()) { + topicToSocketMap.remove(topic); + } + publish(topic + ".numSubscribers", new Integer(getNumSubscribers(topic))); + } + socketToTopicMap.remove(webSocket); + } + } + + private synchronized void disconnect(PubSubWebSocket webSocket) + { + unsubscribeAll(webSocket); + } + + public synchronized int getNumSubscribers(String topic) + { + HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic); + return wsSet == null ? 0 : wsSet.size(); + } + + private synchronized void sendData(PubSubWebSocket webSocket, String topic, Object data) throws IOException + { + PubSubMessage<Object> pubSubMessage = new PubSubMessage<Object>(); + pubSubMessage.setType(PubSubMessageType.DATA); + pubSubMessage.setTopic(topic); + pubSubMessage.setData(data); + LOG.debug("Sending data of {} to subscriber...", topic); + webSocket.sendMessage(codec.formatMessage(pubSubMessage)); + } + + public synchronized void publish(String topic, Object data) + { + HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic); + if (wsSet != null) { + Iterator<PubSubWebSocket> it = wsSet.iterator(); + while (it.hasNext()) { + PubSubWebSocket socket = it.next(); + try { + sendData(socket, topic, data); + } + catch (Exception ex) { + it.remove(); + disconnect(socket); + } + } + } + } + + private class PubSubWebSocket implements WebSocket.OnTextMessage + { + private Connection connection; + private BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(32); + private Thread messengerThread = new Thread(new Messenger()); + + @Override + public void onMessage(String message) + { + LOG.debug("Received message {}", message); + try { + @SuppressWarnings("unchecked") + PubSubMessage<Object> pubSubMessage = codec.parseMessage(message); + if (pubSubMessage != null) { + PubSubMessageType type = pubSubMessage.getType(); + String topic = pubSubMessage.getTopic(); + if (type != null) { + if (type.equals(PubSubMessageType.SUBSCRIBE)) { + if (topic != null) { + subscribe(this, topic); + } + } + else if (type.equals(PubSubMessageType.UNSUBSCRIBE)) { + if (topic != null) { + unsubscribe(this, topic); + } + } + else if (type.equals(PubSubMessageType.PUBLISH)) { + if (topic != null) { + Object data = pubSubMessage.getData(); + if (data != null) { + publish(topic, data); + } + if (topic.startsWith(PubSubMessage.INTERNAL_TOPIC_PREFIX + ".")) { + if (internalMessageHandler != null) { + internalMessageHandler.onMessage(topic, data); + } + } + } + } + else if (type.equals(PubSubMessageType.SUBSCRIBE_NUM_SUBSCRIBERS)) { + if (topic != null) { + subscribe(this, topic + ".numSubscribers"); + sendData(this, topic + ".numSubscribers", new Integer(getNumSubscribers(topic))); + } + } + else if (type.equals(PubSubMessageType.UNSUBSCRIBE_NUM_SUBSCRIBERS)) { + if (topic != null) { + unsubscribe(this, topic + ".numSubscribers"); + } + } + } + } + } + catch (Exception ex) { + LOG.warn("Exception caught", ex); + } + } + + @Override + public void onOpen(Connection connection) + { + LOG.debug("onOpen"); + this.connection = connection; + this.connection.setMaxIdleTime(60 * 60 * 1000); // idle time set to one hour to clear out idle connections from taking resources + messengerThread.start(); + } + + @Override + public void onClose(int i, String string) + { + LOG.debug("onClose"); + disconnect(this); + messengerThread.interrupt(); + } + + public void sendMessage(String message) throws IllegalStateException + { + messageQueue.add(message); + } + + /* + * This class exists only because Jetty 8 does not support async write for websocket + * + */ + private class Messenger implements Runnable + { + @Override + public void run() + { + while (!Thread.interrupted()) { + try { + String message = messageQueue.take(); + // This call sendMessage() is blocking. This is why we have this messenger thread per connection so that one bad connection will not affect another + // Jetty 9 has async calls but we can't use Jetty 9 because it requires Java 7 + // When we can use Java 7, we need to upgrade to Jetty 9. + connection.sendMessage(message); + } + catch (InterruptedException ex) { + return; + } + catch (Exception ex) { + LOG.error("Caught exception in websocket messenger.", ex); + return; + } + } + } + + } + + } + +}
