SPOI-1770 exclude special topics in latest topic list
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/9a9f1538 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/9a9f1538 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/9a9f1538 Branch: refs/heads/devel-3 Commit: 9a9f15381607c34b56e600a8083bdd41caf6f137 Parents: 4e5c9e5 Author: David Yan <[email protected]> Authored: Thu Jan 16 18:12:04 2014 -0800 Committer: David Yan <[email protected]> Committed: Fri Aug 28 10:56:56 2015 -0700 ---------------------------------------------------------------------- PubSubWebSocketServlet.java | 38 ++++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9a9f1538/PubSubWebSocketServlet.java ---------------------------------------------------------------------- diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java index 21bfd73..11bf98e 100644 --- a/PubSubWebSocketServlet.java +++ b/PubSubWebSocketServlet.java @@ -4,23 +4,26 @@ */ package com.datatorrent.gateway; -import com.datatorrent.api.util.JacksonObjectMapperProvider; -import com.datatorrent.api.util.PubSubMessage; -import com.datatorrent.api.util.PubSubMessage.PubSubMessageType; -import com.datatorrent.api.util.PubSubMessageCodec; -import com.datatorrent.stram.util.LRUCache; - import java.io.IOException; import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; + import javax.servlet.http.HttpServletRequest; + import org.codehaus.jackson.map.ObjectMapper; import org.eclipse.jetty.websocket.WebSocket; import org.eclipse.jetty.websocket.WebSocketServlet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.lib.util.JacksonObjectMapperProvider; +import com.datatorrent.lib.util.PubSubMessage; +import com.datatorrent.lib.util.PubSubMessage.PubSubMessageType; +import com.datatorrent.lib.util.PubSubMessageCodec; + +import com.datatorrent.stram.util.LRUCache; + /** * <p>PubSubWebSocketServlet class.</p> * @@ -37,8 +40,9 @@ public class PubSubWebSocketServlet extends WebSocketServlet private PubSubMessageCodec<Object> codec = new PubSubMessageCodec<Object>(mapper); private InternalMessageHandler internalMessageHandler = null; private static final int latestTopicCount = 100; - private LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount, false) + private final LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount, false) { + private static final long serialVersionUID = 20140131L; @Override public Long put(String key, Long value) { @@ -100,7 +104,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet topicSet = socketToTopicMap.get(webSocket); } topicSet.add(topic); - publish(topic + ".numSubscribers", new Integer(getNumSubscribers(topic))); + publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic)); } private synchronized void unsubscribe(PubSubWebSocket webSocket, String topic) @@ -121,7 +125,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet if (topicSet.isEmpty()) { socketToTopicMap.remove(webSocket); } - publish(topic + ".numSubscribers", new Integer(getNumSubscribers(topic))); + publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic)); } private synchronized void unsubscribeAll(PubSubWebSocket webSocket) @@ -134,7 +138,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet if (wsSet.isEmpty()) { topicToSocketMap.remove(topic); } - publish(topic + ".numSubscribers", new Integer(getNumSubscribers(topic))); + publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic)); } socketToTopicMap.remove(webSocket); } @@ -163,7 +167,9 @@ public class PubSubWebSocketServlet extends WebSocketServlet public synchronized void publish(String topic, Object data) { - latestTopics.put(topic, System.currentTimeMillis()); + if (!topic.endsWith("." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX) && !topic.startsWith(PubSubMessage.INTERNAL_TOPIC_PREFIX + ".")) { + latestTopics.put(topic, System.currentTimeMillis()); + } HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic); if (wsSet != null) { Iterator<PubSubWebSocket> it = wsSet.iterator(); @@ -183,8 +189,8 @@ public class PubSubWebSocketServlet extends WebSocketServlet private class PubSubWebSocket implements WebSocket.OnTextMessage { private Connection connection; - private BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(32); - private Thread messengerThread = new Thread(new Messenger()); + private final BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(32); + private final Thread messengerThread = new Thread(new Messenger()); @Override public void onMessage(String message) @@ -222,13 +228,13 @@ public class PubSubWebSocketServlet extends WebSocketServlet } else if (type.equals(PubSubMessageType.SUBSCRIBE_NUM_SUBSCRIBERS)) { if (topic != null) { - subscribe(this, topic + ".numSubscribers"); - sendData(this, topic + ".numSubscribers", new Integer(getNumSubscribers(topic))); + subscribe(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX); + sendData(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic)); } } else if (type.equals(PubSubMessageType.UNSUBSCRIBE_NUM_SUBSCRIBERS)) { if (topic != null) { - unsubscribe(this, topic + ".numSubscribers"); + unsubscribe(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX); } } else if (type.equals(PubSubMessageType.GET_LATEST_TOPICS)) {
