SPOI-1770 exposing list of latest 100 topics
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/4e5c9e56 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/4e5c9e56 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/4e5c9e56 Branch: refs/heads/devel-3 Commit: 4e5c9e561f34aed94f6f205f8b13cd290419f5a9 Parents: 0e3a272 Author: David Yan <[email protected]> Authored: Thu Jan 16 18:02:44 2014 -0800 Committer: David Yan <[email protected]> Committed: Fri Aug 28 10:56:56 2015 -0700 ---------------------------------------------------------------------- PubSubWebSocketServlet.java | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4e5c9e56/PubSubWebSocketServlet.java ---------------------------------------------------------------------- diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java index cd7a71f..21bfd73 100644 --- a/PubSubWebSocketServlet.java +++ b/PubSubWebSocketServlet.java @@ -8,11 +8,10 @@ import com.datatorrent.api.util.JacksonObjectMapperProvider; import com.datatorrent.api.util.PubSubMessage; import com.datatorrent.api.util.PubSubMessage.PubSubMessageType; import com.datatorrent.api.util.PubSubMessageCodec; +import com.datatorrent.stram.util.LRUCache; import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; +import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import javax.servlet.http.HttpServletRequest; @@ -37,6 +36,17 @@ public class PubSubWebSocketServlet extends WebSocketServlet private ObjectMapper mapper = (new JacksonObjectMapperProvider()).getContext(null); private PubSubMessageCodec<Object> codec = new PubSubMessageCodec<Object>(mapper); private InternalMessageHandler internalMessageHandler = null; + private static final int latestTopicCount = 100; + private LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount, false) + { + @Override + public Long put(String key, Long value) + { + remove(key); // this is to make the key the most recently inserted entry + return super.put(key, value); + } + + }; public interface InternalMessageHandler { @@ -153,6 +163,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet public synchronized void publish(String topic, Object data) { + latestTopics.put(topic, System.currentTimeMillis()); HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic); if (wsSet != null) { Iterator<PubSubWebSocket> it = wsSet.iterator(); @@ -220,6 +231,11 @@ public class PubSubWebSocketServlet extends WebSocketServlet unsubscribe(this, topic + ".numSubscribers"); } } + else if (type.equals(PubSubMessageType.GET_LATEST_TOPICS)) { + synchronized (this) { + sendData(this, "_latestTopics", latestTopics.keySet()); + } + } } } }
