Add JMX query support for diagnostic events

patch by Stefan Podkowinski; reviewed by Mick Semb Wever for CASSANDRA-14435


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a79e5903
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a79e5903
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a79e5903

Branch: refs/heads/trunk
Commit: a79e5903b552e40f77c151e23172f054ffb7f39e
Parents: 2846b22
Author: Stefan Podkowinski <[email protected]>
Authored: Wed May 2 13:03:10 2018 +0200
Committer: Stefan Podkowinski <[email protected]>
Committed: Fri Aug 17 14:07:45 2018 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |   1 -
 .../diag/DiagnosticEventPersistence.java        | 151 ++++++++++++++++
 .../cassandra/diag/DiagnosticEventService.java  |  65 ++++++-
 .../diag/DiagnosticEventServiceMBean.java       |  59 +++++++
 .../cassandra/diag/LastEventIdBroadcaster.java  | 150 ++++++++++++++++
 .../diag/LastEventIdBroadcasterMBean.java       |  41 +++++
 .../diag/store/DiagnosticEventMemoryStore.java  |  97 +++++++++++
 .../diag/store/DiagnosticEventStore.java        |  52 ++++++
 .../cassandra/service/StorageService.java       |   5 +-
 .../progress/jmx/JMXBroadcastExecutor.java      |  35 ++++
 .../DiagnosticEventPersistenceBench.java        |  73 ++++++++
 .../microbench/DiagnosticEventServiceBench.java | 103 +++++++++++
 .../config/OverrideConfigurationLoader.java     |  47 +++++
 .../diag/DiagnosticEventServiceTest.java        |   6 +-
 .../store/DiagnosticEventMemoryStoreTest.java   | 170 +++++++++++++++++++
 16 files changed, 1047 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ceba843..097e7dd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Allow retrieving diagnostic events via JMX (CASSANDRA-14435)
  * Add base classes for diagnostic events (CASSANDRA-13457)
  * Clear view system metadata when dropping keyspace (CASSANDRA-14646)
  * Allocate ReentrantLock on-demand in java11 AtomicBTreePartitionerBase 
(CASSANDRA-14637)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 65a34f0..aa5ca92 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2541,7 +2541,6 @@ public class DatabaseDescriptor
         return conf.diagnostic_events_enabled;
     }
 
-    @VisibleForTesting
     public static void setDiagnosticEventsEnabled(boolean enabled)
     {
         conf.diagnostic_events_enabled = enabled;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/src/java/org/apache/cassandra/diag/DiagnosticEventPersistence.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/diag/DiagnosticEventPersistence.java 
b/src/java/org/apache/cassandra/diag/DiagnosticEventPersistence.java
new file mode 100644
index 0000000..7da335c
--- /dev/null
+++ b/src/java/org/apache/cassandra/diag/DiagnosticEventPersistence.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.diag;
+
+import java.io.InvalidClassException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.diag.store.DiagnosticEventMemoryStore;
+import org.apache.cassandra.diag.store.DiagnosticEventStore;
+
+
+/**
+ * Manages storing and retrieving events based on enabled {@link 
DiagnosticEventStore} implementation.
+ */
+public final class DiagnosticEventPersistence
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(DiagnosticEventPersistence.class);
+
+    private static final DiagnosticEventPersistence instance = new 
DiagnosticEventPersistence();
+
+    private final Map<Class, DiagnosticEventStore<Long>> stores = new 
ConcurrentHashMap<>();
+
+    private final Consumer<DiagnosticEvent> eventConsumer = this::onEvent;
+
+    public static void start()
+    {
+        // make sure id broadcaster is initialized (registered as MBean)
+        LastEventIdBroadcaster.instance();
+    }
+
+    public static DiagnosticEventPersistence instance()
+    {
+        return instance;
+    }
+
+    public SortedMap<Long, Map<String, Serializable>> getEvents(String 
eventClazz, Long key, int limit, boolean includeKey)
+    {
+        assert eventClazz != null;
+        assert key != null;
+        assert limit >= 0;
+
+        Class cls;
+        try
+        {
+            cls = getEventClass(eventClazz);
+        }
+        catch (ClassNotFoundException | InvalidClassException e)
+        {
+            throw new RuntimeException(e);
+        }
+        DiagnosticEventStore<Long> store = getStore(cls);
+
+        NavigableMap<Long, DiagnosticEvent> events = store.scan(key, 
includeKey ? limit : limit + 1);
+        if (!includeKey && !events.isEmpty()) events = events.tailMap(key, 
false);
+        TreeMap<Long, Map<String, Serializable>> ret = new TreeMap<>();
+        for (Map.Entry<Long, DiagnosticEvent> entry : events.entrySet())
+        {
+            DiagnosticEvent event = entry.getValue();
+            HashMap<String, Serializable> val = new HashMap<>(event.toMap());
+            val.put("class", event.getClass().getName());
+            val.put("type", event.getType().name());
+            val.put("ts", event.timestamp);
+            val.put("thread", event.threadName);
+            ret.put(entry.getKey(), val);
+        }
+        logger.debug("Returning {} {} events for key {} (limit {}) (includeKey 
{})", ret.size(), eventClazz, key, limit, includeKey);
+        return ret;
+    }
+
+    public void enableEventPersistence(String eventClazz)
+    {
+        try
+        {
+            logger.debug("Enabling events: {}", eventClazz);
+            
DiagnosticEventService.instance().subscribe(getEventClass(eventClazz), 
eventConsumer);
+        }
+        catch (ClassNotFoundException | InvalidClassException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void disableEventPersistence(String eventClazz)
+    {
+        try
+        {
+            logger.debug("Disabling events: {}", eventClazz);
+            
DiagnosticEventService.instance().unsubscribe(getEventClass(eventClazz), 
eventConsumer);
+        }
+        catch (ClassNotFoundException | InvalidClassException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void onEvent(DiagnosticEvent event)
+    {
+        Class<? extends DiagnosticEvent> cls = event.getClass();
+        if (logger.isTraceEnabled())
+            logger.trace("Persisting received {} event", cls.getName());
+        DiagnosticEventStore<Long> store = getStore(cls);
+        store.store(event);
+        
LastEventIdBroadcaster.instance().setLastEventId(event.getClass().getName(), 
store.getLastEventId());
+    }
+
+    private Class<DiagnosticEvent> getEventClass(String eventClazz) throws 
ClassNotFoundException, InvalidClassException
+    {
+        // get class by eventClazz argument name
+        // restrict class loading for security reasons
+        if (!eventClazz.startsWith("org.apache.cassandra."))
+            throw new RuntimeException("Not a Cassandra event class: " + 
eventClazz);
+
+        Class<DiagnosticEvent> clazz = (Class<DiagnosticEvent>) 
Class.forName(eventClazz);
+
+        if (!(DiagnosticEvent.class.isAssignableFrom(clazz)))
+            throw new InvalidClassException("Event class must be of type 
DiagnosticEvent");
+
+        return clazz;
+    }
+
+    private DiagnosticEventStore<Long> getStore(Class cls)
+    {
+        return stores.computeIfAbsent(cls, (storeKey) -> new 
DiagnosticEventMemoryStore());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/src/java/org/apache/cassandra/diag/DiagnosticEventService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/diag/DiagnosticEventService.java 
b/src/java/org/apache/cassandra/diag/DiagnosticEventService.java
index 577d0ba..3f3de7c 100644
--- a/src/java/org/apache/cassandra/diag/DiagnosticEventService.java
+++ b/src/java/org/apache/cassandra/diag/DiagnosticEventService.java
@@ -17,12 +17,19 @@
  */
 package org.apache.cassandra.diag;
 
+import java.io.Serializable;
+import java.lang.management.ManagementFactory;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.SortedMap;
 import java.util.function.Consumer;
 
+import javax.annotation.Nullable;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
 import com.google.common.collect.ImmutableCollection;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMultimap;
@@ -38,7 +45,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 /**
  * Service for publishing and consuming {@link DiagnosticEvent}s.
  */
-public final class DiagnosticEventService
+public final class DiagnosticEventService implements 
DiagnosticEventServiceMBean
 {
     private static final Logger logger = 
LoggerFactory.getLogger(DiagnosticEventService.class);
 
@@ -55,6 +62,20 @@ public final class DiagnosticEventService
 
     private DiagnosticEventService()
     {
+
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
+        {
+            ObjectName jmxObjectName = new 
ObjectName("org.apache.cassandra.diag:type=DiagnosticEventService");
+            mbs.registerMBean(this, jmxObjectName);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        // register broadcasters for JMX events
+        DiagnosticEventPersistence.start();
     }
 
     /**
@@ -99,10 +120,12 @@ public final class DiagnosticEventService
      */
     public synchronized <E extends DiagnosticEvent> void subscribe(Class<E> 
event, Consumer<E> consumer)
     {
+        logger.debug("Adding subscriber: {}", consumer);
         subscribersByClass = ImmutableSetMultimap.<Class<? extends 
DiagnosticEvent>, Consumer<DiagnosticEvent>>builder()
                               .putAll(subscribersByClass)
                               .put(event, new TypedConsumerWrapper<>(consumer))
                               .build();
+        logger.debug("Total subscribers: {}", 
subscribersByClass.values().size());
     }
 
     /**
@@ -149,6 +172,16 @@ public final class DiagnosticEventService
      */
     public synchronized <E extends DiagnosticEvent> void 
unsubscribe(Consumer<E> consumer)
     {
+        unsubscribe(null, consumer);
+    }
+
+    /**
+     * De-registers event handler from receiving any further events.
+     * @param event DiagnosticEvent class to unsubscribe from
+     * @param consumer Consumer registered for receiving events
+     */
+    public synchronized <E extends DiagnosticEvent> void unsubscribe(@Nullable 
Class<E> event, Consumer<E> consumer)
+    {
         // all events
         subscribersAll = ImmutableSet.copyOf(Iterables.filter(subscribersAll, 
(c) -> c != consumer));
 
@@ -161,7 +194,8 @@ public final class DiagnosticEventService
             if (subscriber instanceof TypedConsumerWrapper)
                 subscriber = ((TypedConsumerWrapper)subscriber).wrapped;
 
-            if (subscriber != consumer)
+            // other consumers or other events
+            if (subscriber != consumer || (event != null && 
!entry.getKey().equals(event)))
             {
                 byClassBuilder = byClassBuilder.put(entry);
             }
@@ -181,7 +215,7 @@ public final class DiagnosticEventService
                 Consumer<DiagnosticEvent> subscriber = e.getValue();
                 if (subscriber instanceof TypedConsumerWrapper)
                     subscriber = ((TypedConsumerWrapper) subscriber).wrapped;
-                return subscriber != consumer;
+                return subscriber != consumer || (event != null && 
!byClassEntry.getKey().equals(event));
             }).forEach(byTypeBuilder::put);
 
             ImmutableSetMultimap<Enum<?>, Consumer<DiagnosticEvent>> byType = 
byTypeBuilder.build();
@@ -258,6 +292,31 @@ public final class DiagnosticEventService
         subscribersByClassAndType = ImmutableMap.of();
     }
 
+    public boolean isDiagnosticsEnabled()
+    {
+        return DatabaseDescriptor.diagnosticEventsEnabled();
+    }
+
+    public void disableDiagnostics()
+    {
+        DatabaseDescriptor.setDiagnosticEventsEnabled(false);
+    }
+
+    public SortedMap<Long, Map<String, Serializable>> readEvents(String 
eventClazz, Long lastKey, int limit)
+    {
+        return DiagnosticEventPersistence.instance().getEvents(eventClazz, 
lastKey, limit, false);
+    }
+
+    public void enableEventPersistence(String eventClazz)
+    {
+        
DiagnosticEventPersistence.instance().enableEventPersistence(eventClazz);
+    }
+
+    public void disableEventPersistence(String eventClazz)
+    {
+        
DiagnosticEventPersistence.instance().disableEventPersistence(eventClazz);
+    }
+
     /**
      * Wrapper class for supporting typed event handling for consumers.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/src/java/org/apache/cassandra/diag/DiagnosticEventServiceMBean.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/diag/DiagnosticEventServiceMBean.java 
b/src/java/org/apache/cassandra/diag/DiagnosticEventServiceMBean.java
new file mode 100644
index 0000000..b3af8a7
--- /dev/null
+++ b/src/java/org/apache/cassandra/diag/DiagnosticEventServiceMBean.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.diag;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.SortedMap;
+
+/**
+ * Provides JMX enabled attributes and operations implemented by {@link 
DiagnosticEventService}.
+ */
+public interface DiagnosticEventServiceMBean
+{
+    /*
+     * Indicates if any events will be published.
+     */
+    boolean isDiagnosticsEnabled();
+
+    /**
+     * Kill switch for disabling all events immediately, without restarting 
the node. Please edit cassandra.yaml for
+     * making this permanent.
+     */
+    void disableDiagnostics();
+
+    /**
+     * Retrieved all events of specified type starting with provided key. 
Result will be sorted chronologically.
+     *
+     * @param eventClazz fqn of event class
+     * @param lastKey ID of first event to retrieve
+     * @param limit number of results to return
+     */
+    SortedMap<Long, Map<String, Serializable>> readEvents(String eventClazz, 
Long lastKey, int limit);
+
+    /**
+     * Start storing events to make them available via {@link 
#readEvents(String, Long, int)}.
+     */
+    void enableEventPersistence(String eventClazz);
+
+    /**
+     * Stop storing events.
+     */
+    void disableEventPersistence(String eventClazz);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java 
b/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java
new file mode 100644
index 0000000..9fe5c48
--- /dev/null
+++ b/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.diag;
+
+import java.lang.management.ManagementFactory;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.management.MBeanServer;
+import javax.management.Notification;
+import javax.management.NotificationBroadcasterSupport;
+import javax.management.NotificationFilter;
+import javax.management.NotificationListener;
+import javax.management.ObjectName;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor;
+
+/**
+ * Broadcaster for notifying JMX clients on newly available data. Periodically 
sends {@link Notification}s
+ * containing a list of event types and greatest event IDs. Consumers may use 
this information to
+ * query or poll events based on this data.
+ */
+final class LastEventIdBroadcaster extends NotificationBroadcasterSupport 
implements LastEventIdBroadcasterMBean
+{
+
+    private final static LastEventIdBroadcaster instance = new 
LastEventIdBroadcaster();
+
+    private final static int PERIODIC_BROADCAST_INTERVAL_MILLIS = 30000;
+    private final static int SHORT_TERM_BROADCAST_DELAY_MILLIS = 1000;
+
+    private final AtomicLong notificationSerialNumber = new AtomicLong();
+    private final AtomicReference<ScheduledFuture<?>> 
scheduledPeriodicalBroadcast = new AtomicReference<>();
+    private final AtomicReference<ScheduledFuture<?>> 
scheduledShortTermBroadcast = new AtomicReference<>();
+
+    private final Map<String, Comparable> summary = new ConcurrentHashMap<>();
+
+
+    private LastEventIdBroadcaster()
+    {
+        // use dedicated executor for handling JMX notifications
+        super(JMXBroadcastExecutor.executor);
+
+        summary.put("last_updated_at", 0L);
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
+        {
+            ObjectName jmxObjectName = new 
ObjectName("org.apache.cassandra.diag:type=LastEventIdBroadcaster");
+            mbs.registerMBean(this, jmxObjectName);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static LastEventIdBroadcaster instance()
+    {
+        return instance;
+    }
+
+    public Map<String, Comparable> getLastEventIds()
+    {
+        return summary;
+    }
+
+    public Map<String, Comparable> getLastEventIdsIfModified(long lastUpdate)
+    {
+        if (lastUpdate >= (long)summary.get("last_updated_at")) return summary;
+        else return getLastEventIds();
+    }
+
+    public synchronized void addNotificationListener(NotificationListener 
listener, NotificationFilter filter, Object handback)
+    {
+        super.addNotificationListener(listener, filter, handback);
+
+        // lazily schedule periodical broadcast once we got our first 
subscriber
+        if (scheduledPeriodicalBroadcast.get() == null)
+        {
+            ScheduledFuture<?> scheduledFuture = 
ScheduledExecutors.scheduledTasks
+                                                 
.scheduleAtFixedRate(this::broadcastEventIds,
+                                                                      
PERIODIC_BROADCAST_INTERVAL_MILLIS,
+                                                                      
PERIODIC_BROADCAST_INTERVAL_MILLIS,
+                                                                      
TimeUnit.MILLISECONDS);
+            if (!this.scheduledPeriodicalBroadcast.compareAndSet(null, 
scheduledFuture))
+                scheduledFuture.cancel(false);
+        }
+    }
+
+    public void setLastEventId(String key, Comparable id)
+    {
+        // ensure monotonic properties of ids
+        if (summary.compute(key, (k, v) -> v == null ? id : id.compareTo(v) > 
0 ? id : v) == id) {
+            summary.put("last_updated_at", System.currentTimeMillis());
+            scheduleBroadcast();
+        }
+    }
+
+    private void scheduleBroadcast()
+    {
+        // schedule broadcast for timely announcing new events before next 
periodical broadcast
+        // this should allow us to buffer new updates for a while, while 
keeping broadcasts near-time
+        ScheduledFuture<?> running = scheduledShortTermBroadcast.get();
+        if (running == null || running.isDone())
+        {
+            ScheduledFuture<?> scheduledFuture = 
ScheduledExecutors.scheduledTasks
+                                                 
.schedule((Runnable)this::broadcastEventIds,
+                                                           
SHORT_TERM_BROADCAST_DELAY_MILLIS,
+                                                           
TimeUnit.MILLISECONDS);
+            if (!this.scheduledShortTermBroadcast.compareAndSet(running, 
scheduledFuture))
+                scheduledFuture.cancel(false);
+        }
+    }
+
+    private void broadcastEventIds()
+    {
+        if (!summary.isEmpty())
+            broadcastEventIds(summary);
+    }
+
+    private void broadcastEventIds(Map<String, Comparable> summary)
+    {
+        Notification notification = new Notification("event_last_id_summary",
+                                                     "LastEventIdBroadcaster",
+                                                     
notificationSerialNumber.incrementAndGet(),
+                                                     
System.currentTimeMillis(),
+                                                     "Event last IDs summary");
+        notification.setUserData(summary);
+        sendNotification(notification);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/src/java/org/apache/cassandra/diag/LastEventIdBroadcasterMBean.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/diag/LastEventIdBroadcasterMBean.java 
b/src/java/org/apache/cassandra/diag/LastEventIdBroadcasterMBean.java
new file mode 100644
index 0000000..03f05dc
--- /dev/null
+++ b/src/java/org/apache/cassandra/diag/LastEventIdBroadcasterMBean.java
@@ -0,0 +1,41 @@
+package org.apache.cassandra.diag;
+
+import java.util.Map;
+
+/**
+ * Provides a list of event types and the corresponding highest event IDs. 
Consumers may these IDs to determine
+ * if new data is available.
+ *
+ * <p>Example result</p>
+ *
+ * <table>
+ *     <tr>
+ *         <th>Event</th>
+ *         <th>Last ID</th>
+ *     </tr>
+ *     <tr>
+ *         <td>BootstrapEvent</td>
+ *         <td>312</td>
+ *     </tr>
+ *     <tr>
+ *         <td>CompactionEvent</td>
+ *         <td>a53f9338-5f24-11e8-9c2d-fa7ae01bbebc</td>
+ *     </tr>
+ * </table>
+ *
+ * <p>Clients may either retrieve the current list of all events IDs, or make 
conditional requests for event IDs
+ * based on the timestamp of the last update (much in the sense of e.g. HTTP's 
If-Modified-Since semantics).</p>
+ */
+public interface LastEventIdBroadcasterMBean
+{
+    /**
+     * Retrieves a list of all event types and their highest IDs.
+     */
+    Map<String, Comparable> getLastEventIds();
+
+    /**
+     * Retrieves a list of all event types and their highest IDs, if updated 
since specified timestamp, or null.
+     * @param lastUpdate timestamp to use to determine if IDs have been updated
+     */
+    Map<String, Comparable> getLastEventIdsIfModified(long lastUpdate);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/src/java/org/apache/cassandra/diag/store/DiagnosticEventMemoryStore.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/diag/store/DiagnosticEventMemoryStore.java 
b/src/java/org/apache/cassandra/diag/store/DiagnosticEventMemoryStore.java
new file mode 100644
index 0000000..92fd42b
--- /dev/null
+++ b/src/java/org/apache/cassandra/diag/store/DiagnosticEventMemoryStore.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.diag.store;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.diag.DiagnosticEvent;
+
+/**
+ * Simple on-heap memory store that allows to buffer and retrieve a fixed 
number of events.
+ */
+public final class DiagnosticEventMemoryStore implements 
DiagnosticEventStore<Long>
+{
+    private final AtomicLong lastKey = new AtomicLong(0);
+
+    private int maxSize = 200;
+
+    // event access will mostly happen based on a recent event offset, so we 
add new events to the head of the list
+    // for optimized search times
+    private final ConcurrentSkipListMap<Long, DiagnosticEvent> events = new 
ConcurrentSkipListMap<>(Comparator.reverseOrder());
+
+    public void load()
+    {
+        // no-op
+    }
+
+    public void store(DiagnosticEvent event)
+    {
+        long keyHead = lastKey.incrementAndGet();
+        events.put(keyHead, event);
+
+        // remove elements starting exceeding max size
+        if (keyHead > maxSize) events.tailMap(keyHead - maxSize).clear();
+    }
+
+    public NavigableMap<Long, DiagnosticEvent> scan(Long id, int limit)
+    {
+        assert id != null && id >= 0;
+        assert limit >= 0;
+
+        // [10..1].headMap(2, false): [10..3]
+        ConcurrentNavigableMap<Long, DiagnosticEvent> newerEvents = 
events.headMap(id, true);
+        // [3..10]
+        ConcurrentNavigableMap<Long, DiagnosticEvent> ret = 
newerEvents.descendingMap();
+        if (limit == 0)
+        {
+            return ret;
+        }
+        else
+        {
+            Map.Entry<Long, DiagnosticEvent> first = ret.firstEntry();
+            if (first == null) return ret;
+            else return ret.headMap(first.getKey() + limit);
+        }
+    }
+
+    public Long getLastEventId()
+    {
+        return lastKey.get();
+    }
+
+    @VisibleForTesting
+    int size()
+    {
+        return events.size();
+    }
+
+    @VisibleForTesting
+    void setMaxSize(int maxSize)
+    {
+        this.maxSize = maxSize;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/src/java/org/apache/cassandra/diag/store/DiagnosticEventStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/diag/store/DiagnosticEventStore.java 
b/src/java/org/apache/cassandra/diag/store/DiagnosticEventStore.java
new file mode 100644
index 0000000..86b2df3
--- /dev/null
+++ b/src/java/org/apache/cassandra/diag/store/DiagnosticEventStore.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.diag.store;
+
+import java.util.NavigableMap;
+
+import org.apache.cassandra.diag.DiagnosticEvent;
+
+/**
+ * Enables storing and retrieving {@link DiagnosticEvent}s.
+ * @param <T> type of key that is used to reference an event
+ */
+public interface DiagnosticEventStore<T extends Comparable<T>>
+{
+    /**
+     * Initializes the store.
+     */
+    void load();
+
+    /**
+     * Stores provided event and returns the new associated store key for it.
+     */
+    void store(DiagnosticEvent event);
+
+    /**
+     * Returns a view on all events with a key greater than the provided value 
(inclusive) up to the specified
+     * number of results. Events may be added or become unavailable over time. 
Keys must be unique, sortable and
+     * monotonically incrementing. Returns an empty map in case no events 
could be found.
+     */
+    NavigableMap<T, DiagnosticEvent> scan(T key, int limit);
+
+    /**
+     * Returns the greatest event ID that can be used to fetch events via 
{@link #scan(Comparable, int)}.
+     */
+    T getLastEventId();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index cc02e17..84ec49d 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -111,6 +111,7 @@ import org.apache.cassandra.utils.TopKSampler.SamplerResult;
 import org.apache.cassandra.utils.logging.LoggingSupportFactory;
 import org.apache.cassandra.utils.progress.ProgressEvent;
 import org.apache.cassandra.utils.progress.ProgressEventType;
+import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor;
 import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport;
 
 import static java.util.Arrays.asList;
@@ -261,8 +262,8 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
     public StorageService()
     {
-        // use dedicated executor for sending JMX notifications
-        super(Executors.newSingleThreadExecutor());
+        // use dedicated executor for handling JMX notifications
+        super(JMXBroadcastExecutor.executor);
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/src/java/org/apache/cassandra/utils/progress/jmx/JMXBroadcastExecutor.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/utils/progress/jmx/JMXBroadcastExecutor.java 
b/src/java/org/apache/cassandra/utils/progress/jmx/JMXBroadcastExecutor.java
new file mode 100644
index 0000000..f545f0f
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/progress/jmx/JMXBroadcastExecutor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.progress.jmx;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/**
+ * Holds dedicated executor for JMX event handling. Events will internally 
queued by ArrayNotificationBuffer,
+ * synchronized by an exclusive write lock, which makes a shared single thread 
executor desirable.
+ */
+public final class JMXBroadcastExecutor
+{
+
+    private JMXBroadcastExecutor() { }
+
+    public final static Executor executor = 
Executors.newSingleThreadExecutor();
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/test/microbench/org/apache/cassandra/test/microbench/DiagnosticEventPersistenceBench.java
----------------------------------------------------------------------
diff --git 
a/test/microbench/org/apache/cassandra/test/microbench/DiagnosticEventPersistenceBench.java
 
b/test/microbench/org/apache/cassandra/test/microbench/DiagnosticEventPersistenceBench.java
new file mode 100644
index 0000000..5ddf0c5
--- /dev/null
+++ 
b/test/microbench/org/apache/cassandra/test/microbench/DiagnosticEventPersistenceBench.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.OverrideConfigurationLoader;
+import org.apache.cassandra.diag.DiagnosticEvent;
+import org.apache.cassandra.diag.DiagnosticEventPersistence;
+import org.apache.cassandra.diag.DiagnosticEventService;
+import 
org.apache.cassandra.test.microbench.DiagnosticEventServiceBench.DummyEvent;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+@BenchmarkMode(Mode.All)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Warmup(iterations = 4, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 8, time = 2, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 2)
+@Threads(4)
+@State(Scope.Benchmark)
+public class DiagnosticEventPersistenceBench
+{
+    private DiagnosticEventService service = DiagnosticEventService.instance();
+    private DiagnosticEventPersistence persistence = 
DiagnosticEventPersistence.instance();
+    private DiagnosticEvent event = new DummyEvent();
+
+    @Setup
+    public void setup()
+    {
+        OverrideConfigurationLoader.override((config) -> {
+            config.diagnostic_events_enabled = true;
+        });
+        DatabaseDescriptor.daemonInitialization();
+
+        service.cleanup();
+
+        // make persistence subscribe to and store events
+        persistence.enableEventPersistence(DummyEvent.class.getName());
+    }
+
+    @Benchmark
+    public void persistEvents()
+    {
+        service.publish(event);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/test/microbench/org/apache/cassandra/test/microbench/DiagnosticEventServiceBench.java
----------------------------------------------------------------------
diff --git 
a/test/microbench/org/apache/cassandra/test/microbench/DiagnosticEventServiceBench.java
 
b/test/microbench/org/apache/cassandra/test/microbench/DiagnosticEventServiceBench.java
new file mode 100644
index 0000000..351138e
--- /dev/null
+++ 
b/test/microbench/org/apache/cassandra/test/microbench/DiagnosticEventServiceBench.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.OverrideConfigurationLoader;
+import org.apache.cassandra.diag.DiagnosticEvent;
+import org.apache.cassandra.diag.DiagnosticEventService;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Warmup(iterations = 4, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 8, time = 2, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 2)
+@Threads(4)
+@State(Scope.Benchmark)
+public class DiagnosticEventServiceBench
+{
+    private DiagnosticEventService service = DiagnosticEventService.instance();
+    private DiagnosticEvent event = new DummyEvent();
+
+    @Param({ "0", "1", "12" })
+    private int subscribers = 0;
+
+    @Setup
+    public void setup()
+    {
+        OverrideConfigurationLoader.override((config) -> {
+            config.diagnostic_events_enabled = true;
+        });
+        DatabaseDescriptor.daemonInitialization();
+
+        service.cleanup();
+
+        for (int i = 0; i < subscribers; i++)
+        {
+            service.subscribe(DummyEvent.class, new Consumer<DummyEvent>()
+            {
+                public void accept(DummyEvent dummyEvent)
+                {
+                    // No-op
+                }
+            });
+        }
+    }
+
+    @Benchmark
+    public void publishEvents()
+    {
+        service.publish(event);
+    }
+
+    final static class DummyEvent extends DiagnosticEvent
+    {
+        public Enum<?> getType()
+        {
+            return null;
+        }
+
+        public Object getSource()
+        {
+            return null;
+        }
+
+        public Map<String, Serializable> toMap()
+        {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/test/unit/org/apache/cassandra/config/OverrideConfigurationLoader.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/config/OverrideConfigurationLoader.java 
b/test/unit/org/apache/cassandra/config/OverrideConfigurationLoader.java
new file mode 100644
index 0000000..e0a5576
--- /dev/null
+++ b/test/unit/org/apache/cassandra/config/OverrideConfigurationLoader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.config;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.cassandra.exceptions.ConfigurationException;
+
+/**
+ * Helper class for programmatically overriding individual config values 
before DatabaseDescriptor is bootstrapped.
+ */
+public class OverrideConfigurationLoader implements ConfigurationLoader
+{
+
+    private static Consumer<Config> configModifier;
+
+    public Config loadConfig() throws ConfigurationException
+    {
+        YamlConfigurationLoader loader = new YamlConfigurationLoader();
+        Config config = loader.loadConfig();
+        configModifier.accept(config);
+        return config;
+    }
+
+    public static void override(Consumer<Config> modifier)
+    {
+        System.setProperty(Config.PROPERTY_PREFIX + "config.loader", 
OverrideConfigurationLoader.class.getName());
+        configModifier = modifier;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/test/unit/org/apache/cassandra/diag/DiagnosticEventServiceTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/diag/DiagnosticEventServiceTest.java 
b/test/unit/org/apache/cassandra/diag/DiagnosticEventServiceTest.java
index 0db5ef6..a645c03 100644
--- a/test/unit/org/apache/cassandra/diag/DiagnosticEventServiceTest.java
+++ b/test/unit/org/apache/cassandra/diag/DiagnosticEventServiceTest.java
@@ -214,7 +214,7 @@ public class DiagnosticEventServiceTest
         DatabaseDescriptor.setDiagnosticEventsEnabled(true);
     }
 
-    private static class TestEvent1 extends DiagnosticEvent
+    public static class TestEvent1 extends DiagnosticEvent
     {
         public TestEventType getType()
         {
@@ -227,7 +227,7 @@ public class DiagnosticEventServiceTest
         }
     }
 
-    private static class TestEvent2 extends DiagnosticEvent
+    public static class TestEvent2 extends DiagnosticEvent
     {
         public TestEventType getType()
         {
@@ -240,5 +240,5 @@ public class DiagnosticEventServiceTest
         }
     }
 
-    private enum TestEventType { TEST1, TEST2, TEST3 };
+    public enum TestEventType { TEST1, TEST2, TEST3 };
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/test/unit/org/apache/cassandra/diag/store/DiagnosticEventMemoryStoreTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/diag/store/DiagnosticEventMemoryStoreTest.java 
b/test/unit/org/apache/cassandra/diag/store/DiagnosticEventMemoryStoreTest.java
new file mode 100644
index 0000000..5e897b6
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/diag/store/DiagnosticEventMemoryStoreTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.diag.store;
+
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.junit.Test;
+
+import org.apache.cassandra.diag.DiagnosticEvent;
+import org.apache.cassandra.diag.DiagnosticEventServiceTest.TestEvent1;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+public class DiagnosticEventMemoryStoreTest
+{
+    @Test
+    public void testEmpty()
+    {
+        DiagnosticEventMemoryStore store = new DiagnosticEventMemoryStore();
+        assertEquals(0, store.size());
+        assertEquals(0, store.scan(0L, 10).size());
+    }
+
+    @Test
+    public void testSingle()
+    {
+        DiagnosticEventMemoryStore store = new DiagnosticEventMemoryStore();
+        store.store(new TestEvent1());
+        assertEquals(1, store.size());
+        assertEquals(1, store.scan(0L, 10).size());
+    }
+
+    @Test
+    public void testIdentity()
+    {
+        DiagnosticEventMemoryStore store = new DiagnosticEventMemoryStore();
+        TestEvent1 e1 = new TestEvent1();
+        TestEvent1 e2 = new TestEvent1();
+        TestEvent1 e3 = new TestEvent1();
+
+        store.store(e1);
+        store.store(e2);
+        store.store(e3);
+
+        assertEquals(3, store.size());
+
+        NavigableMap<Long, DiagnosticEvent> res = store.scan(0L, 10);
+        assertEquals(3, res.size());
+
+        Map.Entry<Long, DiagnosticEvent> entry = res.pollFirstEntry();
+        assertEquals(new Long(1), entry.getKey());
+        assertSame(e1, entry.getValue());
+
+        entry = res.pollFirstEntry();
+        assertEquals(new Long(2), entry.getKey());
+        assertSame(e2, entry.getValue());
+
+        entry = res.pollFirstEntry();
+        assertEquals(new Long(3), entry.getKey());
+        assertSame(e3, entry.getValue());
+    }
+
+    @Test
+    public void testLimit()
+    {
+        DiagnosticEventMemoryStore store = new DiagnosticEventMemoryStore();
+
+        TestEvent1 e1 = new TestEvent1();
+        TestEvent1 e2 = new TestEvent1();
+        TestEvent1 e3 = new TestEvent1();
+
+        store.store(e1);
+        store.store(e2);
+        store.store(e3);
+
+        NavigableMap<Long, DiagnosticEvent> res = store.scan(0L, 2);
+        assertEquals(2, res.size());
+
+        Map.Entry<Long, DiagnosticEvent> entry = res.pollFirstEntry();
+        assertEquals(new Long(1), entry.getKey());
+        assertSame(e1, entry.getValue());
+
+        entry = res.pollLastEntry();
+        assertEquals(new Long(2), entry.getKey());
+        assertSame(e2, entry.getValue());
+    }
+
+    @Test
+    public void testSeek()
+    {
+        DiagnosticEventMemoryStore store = new DiagnosticEventMemoryStore();
+
+        TestEvent1 e2 = new TestEvent1();
+        TestEvent1 e3 = new TestEvent1();
+
+        store.store(new TestEvent1());
+        store.store(e2);
+        store.store(e3);
+        store.store(new TestEvent1());
+        store.store(new TestEvent1());
+        store.store(new TestEvent1());
+
+        NavigableMap<Long, DiagnosticEvent> res = store.scan(2L, 2);
+        assertEquals(2, res.size());
+
+        Map.Entry<Long, DiagnosticEvent> entry = res.pollFirstEntry();
+        assertEquals(new Long(2), entry.getKey());
+        assertSame(e2, entry.getValue());
+
+        entry = res.pollLastEntry();
+        assertEquals(new Long(3), entry.getKey());
+        assertSame(e3, entry.getValue());
+    }
+
+    @Test
+    public void testMaxElements()
+    {
+        DiagnosticEventMemoryStore store = new DiagnosticEventMemoryStore();
+        store.setMaxSize(3);
+
+        store.store(new TestEvent1());
+        store.store(new TestEvent1());
+        store.store(new TestEvent1());
+        TestEvent1 e2 = new TestEvent1(); // 4
+        TestEvent1 e3 = new TestEvent1();
+        store.store(e2);
+        store.store(e3);
+        store.store(new TestEvent1()); // 6
+
+        assertEquals(3, store.size());
+
+        NavigableMap<Long, DiagnosticEvent> res = store.scan(4L, 2);
+        assertEquals(2, res.size());
+
+        Map.Entry<Long, DiagnosticEvent> entry = res.pollFirstEntry();
+        assertEquals(new Long(4), entry.getKey());
+        assertSame(e2, entry.getValue());
+
+        entry = res.pollFirstEntry();
+        assertEquals(new Long(5), entry.getKey());
+        assertSame(e3, entry.getValue());
+
+        store.store(new TestEvent1()); // 7
+        store.store(new TestEvent1());
+        store.store(new TestEvent1());
+
+        res = store.scan(4L, 2);
+        assertEquals(2, res.size());
+        assertEquals(new Long(7), res.pollFirstEntry().getKey());
+        assertEquals(new Long(8), res.pollLastEntry().getKey());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to