Add JMX query support for diagnostic events patch by Stefan Podkowinski; reviewed by Mick Semb Wever for CASSANDRA-14435
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a79e5903 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a79e5903 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a79e5903 Branch: refs/heads/trunk Commit: a79e5903b552e40f77c151e23172f054ffb7f39e Parents: 2846b22 Author: Stefan Podkowinski <[email protected]> Authored: Wed May 2 13:03:10 2018 +0200 Committer: Stefan Podkowinski <[email protected]> Committed: Fri Aug 17 14:07:45 2018 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/config/DatabaseDescriptor.java | 1 - .../diag/DiagnosticEventPersistence.java | 151 ++++++++++++++++ .../cassandra/diag/DiagnosticEventService.java | 65 ++++++- .../diag/DiagnosticEventServiceMBean.java | 59 +++++++ .../cassandra/diag/LastEventIdBroadcaster.java | 150 ++++++++++++++++ .../diag/LastEventIdBroadcasterMBean.java | 41 +++++ .../diag/store/DiagnosticEventMemoryStore.java | 97 +++++++++++ .../diag/store/DiagnosticEventStore.java | 52 ++++++ .../cassandra/service/StorageService.java | 5 +- .../progress/jmx/JMXBroadcastExecutor.java | 35 ++++ .../DiagnosticEventPersistenceBench.java | 73 ++++++++ .../microbench/DiagnosticEventServiceBench.java | 103 +++++++++++ .../config/OverrideConfigurationLoader.java | 47 +++++ .../diag/DiagnosticEventServiceTest.java | 6 +- .../store/DiagnosticEventMemoryStoreTest.java | 170 +++++++++++++++++++ 16 files changed, 1047 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ceba843..097e7dd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Allow retrieving diagnostic events via JMX (CASSANDRA-14435) * Add base classes for diagnostic events (CASSANDRA-13457) * Clear view system metadata when dropping keyspace (CASSANDRA-14646) * Allocate ReentrantLock on-demand in java11 AtomicBTreePartitionerBase (CASSANDRA-14637) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 65a34f0..aa5ca92 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -2541,7 +2541,6 @@ public class DatabaseDescriptor return conf.diagnostic_events_enabled; } - @VisibleForTesting public static void setDiagnosticEventsEnabled(boolean enabled) { conf.diagnostic_events_enabled = enabled; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/src/java/org/apache/cassandra/diag/DiagnosticEventPersistence.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/diag/DiagnosticEventPersistence.java b/src/java/org/apache/cassandra/diag/DiagnosticEventPersistence.java new file mode 100644 index 0000000..7da335c --- /dev/null +++ b/src/java/org/apache/cassandra/diag/DiagnosticEventPersistence.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.diag; + +import java.io.InvalidClassException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.NavigableMap; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.diag.store.DiagnosticEventMemoryStore; +import org.apache.cassandra.diag.store.DiagnosticEventStore; + + +/** + * Manages storing and retrieving events based on enabled {@link DiagnosticEventStore} implementation. + */ +public final class DiagnosticEventPersistence +{ + private static final Logger logger = LoggerFactory.getLogger(DiagnosticEventPersistence.class); + + private static final DiagnosticEventPersistence instance = new DiagnosticEventPersistence(); + + private final Map<Class, DiagnosticEventStore<Long>> stores = new ConcurrentHashMap<>(); + + private final Consumer<DiagnosticEvent> eventConsumer = this::onEvent; + + public static void start() + { + // make sure id broadcaster is initialized (registered as MBean) + LastEventIdBroadcaster.instance(); + } + + public static DiagnosticEventPersistence instance() + { + return instance; + } + + public SortedMap<Long, Map<String, Serializable>> getEvents(String eventClazz, Long key, int limit, boolean includeKey) + { + assert eventClazz != null; + assert key != null; + assert limit >= 0; + + Class cls; + try + { + cls = getEventClass(eventClazz); + } + catch (ClassNotFoundException | InvalidClassException e) + { + throw new RuntimeException(e); + } + DiagnosticEventStore<Long> store = getStore(cls); + + NavigableMap<Long, DiagnosticEvent> events = store.scan(key, includeKey ? limit : limit + 1); + if (!includeKey && !events.isEmpty()) events = events.tailMap(key, false); + TreeMap<Long, Map<String, Serializable>> ret = new TreeMap<>(); + for (Map.Entry<Long, DiagnosticEvent> entry : events.entrySet()) + { + DiagnosticEvent event = entry.getValue(); + HashMap<String, Serializable> val = new HashMap<>(event.toMap()); + val.put("class", event.getClass().getName()); + val.put("type", event.getType().name()); + val.put("ts", event.timestamp); + val.put("thread", event.threadName); + ret.put(entry.getKey(), val); + } + logger.debug("Returning {} {} events for key {} (limit {}) (includeKey {})", ret.size(), eventClazz, key, limit, includeKey); + return ret; + } + + public void enableEventPersistence(String eventClazz) + { + try + { + logger.debug("Enabling events: {}", eventClazz); + DiagnosticEventService.instance().subscribe(getEventClass(eventClazz), eventConsumer); + } + catch (ClassNotFoundException | InvalidClassException e) + { + throw new RuntimeException(e); + } + } + + public void disableEventPersistence(String eventClazz) + { + try + { + logger.debug("Disabling events: {}", eventClazz); + DiagnosticEventService.instance().unsubscribe(getEventClass(eventClazz), eventConsumer); + } + catch (ClassNotFoundException | InvalidClassException e) + { + throw new RuntimeException(e); + } + } + + private void onEvent(DiagnosticEvent event) + { + Class<? extends DiagnosticEvent> cls = event.getClass(); + if (logger.isTraceEnabled()) + logger.trace("Persisting received {} event", cls.getName()); + DiagnosticEventStore<Long> store = getStore(cls); + store.store(event); + LastEventIdBroadcaster.instance().setLastEventId(event.getClass().getName(), store.getLastEventId()); + } + + private Class<DiagnosticEvent> getEventClass(String eventClazz) throws ClassNotFoundException, InvalidClassException + { + // get class by eventClazz argument name + // restrict class loading for security reasons + if (!eventClazz.startsWith("org.apache.cassandra.")) + throw new RuntimeException("Not a Cassandra event class: " + eventClazz); + + Class<DiagnosticEvent> clazz = (Class<DiagnosticEvent>) Class.forName(eventClazz); + + if (!(DiagnosticEvent.class.isAssignableFrom(clazz))) + throw new InvalidClassException("Event class must be of type DiagnosticEvent"); + + return clazz; + } + + private DiagnosticEventStore<Long> getStore(Class cls) + { + return stores.computeIfAbsent(cls, (storeKey) -> new DiagnosticEventMemoryStore()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/src/java/org/apache/cassandra/diag/DiagnosticEventService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/diag/DiagnosticEventService.java b/src/java/org/apache/cassandra/diag/DiagnosticEventService.java index 577d0ba..3f3de7c 100644 --- a/src/java/org/apache/cassandra/diag/DiagnosticEventService.java +++ b/src/java/org/apache/cassandra/diag/DiagnosticEventService.java @@ -17,12 +17,19 @@ */ package org.apache.cassandra.diag; +import java.io.Serializable; +import java.lang.management.ManagementFactory; import java.util.Collection; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.SortedMap; import java.util.function.Consumer; +import javax.annotation.Nullable; +import javax.management.MBeanServer; +import javax.management.ObjectName; + import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; @@ -38,7 +45,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; /** * Service for publishing and consuming {@link DiagnosticEvent}s. */ -public final class DiagnosticEventService +public final class DiagnosticEventService implements DiagnosticEventServiceMBean { private static final Logger logger = LoggerFactory.getLogger(DiagnosticEventService.class); @@ -55,6 +62,20 @@ public final class DiagnosticEventService private DiagnosticEventService() { + + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + try + { + ObjectName jmxObjectName = new ObjectName("org.apache.cassandra.diag:type=DiagnosticEventService"); + mbs.registerMBean(this, jmxObjectName); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + + // register broadcasters for JMX events + DiagnosticEventPersistence.start(); } /** @@ -99,10 +120,12 @@ public final class DiagnosticEventService */ public synchronized <E extends DiagnosticEvent> void subscribe(Class<E> event, Consumer<E> consumer) { + logger.debug("Adding subscriber: {}", consumer); subscribersByClass = ImmutableSetMultimap.<Class<? extends DiagnosticEvent>, Consumer<DiagnosticEvent>>builder() .putAll(subscribersByClass) .put(event, new TypedConsumerWrapper<>(consumer)) .build(); + logger.debug("Total subscribers: {}", subscribersByClass.values().size()); } /** @@ -149,6 +172,16 @@ public final class DiagnosticEventService */ public synchronized <E extends DiagnosticEvent> void unsubscribe(Consumer<E> consumer) { + unsubscribe(null, consumer); + } + + /** + * De-registers event handler from receiving any further events. + * @param event DiagnosticEvent class to unsubscribe from + * @param consumer Consumer registered for receiving events + */ + public synchronized <E extends DiagnosticEvent> void unsubscribe(@Nullable Class<E> event, Consumer<E> consumer) + { // all events subscribersAll = ImmutableSet.copyOf(Iterables.filter(subscribersAll, (c) -> c != consumer)); @@ -161,7 +194,8 @@ public final class DiagnosticEventService if (subscriber instanceof TypedConsumerWrapper) subscriber = ((TypedConsumerWrapper)subscriber).wrapped; - if (subscriber != consumer) + // other consumers or other events + if (subscriber != consumer || (event != null && !entry.getKey().equals(event))) { byClassBuilder = byClassBuilder.put(entry); } @@ -181,7 +215,7 @@ public final class DiagnosticEventService Consumer<DiagnosticEvent> subscriber = e.getValue(); if (subscriber instanceof TypedConsumerWrapper) subscriber = ((TypedConsumerWrapper) subscriber).wrapped; - return subscriber != consumer; + return subscriber != consumer || (event != null && !byClassEntry.getKey().equals(event)); }).forEach(byTypeBuilder::put); ImmutableSetMultimap<Enum<?>, Consumer<DiagnosticEvent>> byType = byTypeBuilder.build(); @@ -258,6 +292,31 @@ public final class DiagnosticEventService subscribersByClassAndType = ImmutableMap.of(); } + public boolean isDiagnosticsEnabled() + { + return DatabaseDescriptor.diagnosticEventsEnabled(); + } + + public void disableDiagnostics() + { + DatabaseDescriptor.setDiagnosticEventsEnabled(false); + } + + public SortedMap<Long, Map<String, Serializable>> readEvents(String eventClazz, Long lastKey, int limit) + { + return DiagnosticEventPersistence.instance().getEvents(eventClazz, lastKey, limit, false); + } + + public void enableEventPersistence(String eventClazz) + { + DiagnosticEventPersistence.instance().enableEventPersistence(eventClazz); + } + + public void disableEventPersistence(String eventClazz) + { + DiagnosticEventPersistence.instance().disableEventPersistence(eventClazz); + } + /** * Wrapper class for supporting typed event handling for consumers. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/src/java/org/apache/cassandra/diag/DiagnosticEventServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/diag/DiagnosticEventServiceMBean.java b/src/java/org/apache/cassandra/diag/DiagnosticEventServiceMBean.java new file mode 100644 index 0000000..b3af8a7 --- /dev/null +++ b/src/java/org/apache/cassandra/diag/DiagnosticEventServiceMBean.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.diag; + +import java.io.Serializable; +import java.util.Map; +import java.util.SortedMap; + +/** + * Provides JMX enabled attributes and operations implemented by {@link DiagnosticEventService}. + */ +public interface DiagnosticEventServiceMBean +{ + /* + * Indicates if any events will be published. + */ + boolean isDiagnosticsEnabled(); + + /** + * Kill switch for disabling all events immediately, without restarting the node. Please edit cassandra.yaml for + * making this permanent. + */ + void disableDiagnostics(); + + /** + * Retrieved all events of specified type starting with provided key. Result will be sorted chronologically. + * + * @param eventClazz fqn of event class + * @param lastKey ID of first event to retrieve + * @param limit number of results to return + */ + SortedMap<Long, Map<String, Serializable>> readEvents(String eventClazz, Long lastKey, int limit); + + /** + * Start storing events to make them available via {@link #readEvents(String, Long, int)}. + */ + void enableEventPersistence(String eventClazz); + + /** + * Stop storing events. + */ + void disableEventPersistence(String eventClazz); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java b/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java new file mode 100644 index 0000000..9fe5c48 --- /dev/null +++ b/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.diag; + +import java.lang.management.ManagementFactory; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import javax.management.MBeanServer; +import javax.management.Notification; +import javax.management.NotificationBroadcasterSupport; +import javax.management.NotificationFilter; +import javax.management.NotificationListener; +import javax.management.ObjectName; + +import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor; + +/** + * Broadcaster for notifying JMX clients on newly available data. Periodically sends {@link Notification}s + * containing a list of event types and greatest event IDs. Consumers may use this information to + * query or poll events based on this data. + */ +final class LastEventIdBroadcaster extends NotificationBroadcasterSupport implements LastEventIdBroadcasterMBean +{ + + private final static LastEventIdBroadcaster instance = new LastEventIdBroadcaster(); + + private final static int PERIODIC_BROADCAST_INTERVAL_MILLIS = 30000; + private final static int SHORT_TERM_BROADCAST_DELAY_MILLIS = 1000; + + private final AtomicLong notificationSerialNumber = new AtomicLong(); + private final AtomicReference<ScheduledFuture<?>> scheduledPeriodicalBroadcast = new AtomicReference<>(); + private final AtomicReference<ScheduledFuture<?>> scheduledShortTermBroadcast = new AtomicReference<>(); + + private final Map<String, Comparable> summary = new ConcurrentHashMap<>(); + + + private LastEventIdBroadcaster() + { + // use dedicated executor for handling JMX notifications + super(JMXBroadcastExecutor.executor); + + summary.put("last_updated_at", 0L); + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + try + { + ObjectName jmxObjectName = new ObjectName("org.apache.cassandra.diag:type=LastEventIdBroadcaster"); + mbs.registerMBean(this, jmxObjectName); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + public static LastEventIdBroadcaster instance() + { + return instance; + } + + public Map<String, Comparable> getLastEventIds() + { + return summary; + } + + public Map<String, Comparable> getLastEventIdsIfModified(long lastUpdate) + { + if (lastUpdate >= (long)summary.get("last_updated_at")) return summary; + else return getLastEventIds(); + } + + public synchronized void addNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback) + { + super.addNotificationListener(listener, filter, handback); + + // lazily schedule periodical broadcast once we got our first subscriber + if (scheduledPeriodicalBroadcast.get() == null) + { + ScheduledFuture<?> scheduledFuture = ScheduledExecutors.scheduledTasks + .scheduleAtFixedRate(this::broadcastEventIds, + PERIODIC_BROADCAST_INTERVAL_MILLIS, + PERIODIC_BROADCAST_INTERVAL_MILLIS, + TimeUnit.MILLISECONDS); + if (!this.scheduledPeriodicalBroadcast.compareAndSet(null, scheduledFuture)) + scheduledFuture.cancel(false); + } + } + + public void setLastEventId(String key, Comparable id) + { + // ensure monotonic properties of ids + if (summary.compute(key, (k, v) -> v == null ? id : id.compareTo(v) > 0 ? id : v) == id) { + summary.put("last_updated_at", System.currentTimeMillis()); + scheduleBroadcast(); + } + } + + private void scheduleBroadcast() + { + // schedule broadcast for timely announcing new events before next periodical broadcast + // this should allow us to buffer new updates for a while, while keeping broadcasts near-time + ScheduledFuture<?> running = scheduledShortTermBroadcast.get(); + if (running == null || running.isDone()) + { + ScheduledFuture<?> scheduledFuture = ScheduledExecutors.scheduledTasks + .schedule((Runnable)this::broadcastEventIds, + SHORT_TERM_BROADCAST_DELAY_MILLIS, + TimeUnit.MILLISECONDS); + if (!this.scheduledShortTermBroadcast.compareAndSet(running, scheduledFuture)) + scheduledFuture.cancel(false); + } + } + + private void broadcastEventIds() + { + if (!summary.isEmpty()) + broadcastEventIds(summary); + } + + private void broadcastEventIds(Map<String, Comparable> summary) + { + Notification notification = new Notification("event_last_id_summary", + "LastEventIdBroadcaster", + notificationSerialNumber.incrementAndGet(), + System.currentTimeMillis(), + "Event last IDs summary"); + notification.setUserData(summary); + sendNotification(notification); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/src/java/org/apache/cassandra/diag/LastEventIdBroadcasterMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/diag/LastEventIdBroadcasterMBean.java b/src/java/org/apache/cassandra/diag/LastEventIdBroadcasterMBean.java new file mode 100644 index 0000000..03f05dc --- /dev/null +++ b/src/java/org/apache/cassandra/diag/LastEventIdBroadcasterMBean.java @@ -0,0 +1,41 @@ +package org.apache.cassandra.diag; + +import java.util.Map; + +/** + * Provides a list of event types and the corresponding highest event IDs. Consumers may these IDs to determine + * if new data is available. + * + * <p>Example result</p> + * + * <table> + * <tr> + * <th>Event</th> + * <th>Last ID</th> + * </tr> + * <tr> + * <td>BootstrapEvent</td> + * <td>312</td> + * </tr> + * <tr> + * <td>CompactionEvent</td> + * <td>a53f9338-5f24-11e8-9c2d-fa7ae01bbebc</td> + * </tr> + * </table> + * + * <p>Clients may either retrieve the current list of all events IDs, or make conditional requests for event IDs + * based on the timestamp of the last update (much in the sense of e.g. HTTP's If-Modified-Since semantics).</p> + */ +public interface LastEventIdBroadcasterMBean +{ + /** + * Retrieves a list of all event types and their highest IDs. + */ + Map<String, Comparable> getLastEventIds(); + + /** + * Retrieves a list of all event types and their highest IDs, if updated since specified timestamp, or null. + * @param lastUpdate timestamp to use to determine if IDs have been updated + */ + Map<String, Comparable> getLastEventIdsIfModified(long lastUpdate); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/src/java/org/apache/cassandra/diag/store/DiagnosticEventMemoryStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/diag/store/DiagnosticEventMemoryStore.java b/src/java/org/apache/cassandra/diag/store/DiagnosticEventMemoryStore.java new file mode 100644 index 0000000..92fd42b --- /dev/null +++ b/src/java/org/apache/cassandra/diag/store/DiagnosticEventMemoryStore.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.diag.store; + +import java.util.Comparator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.diag.DiagnosticEvent; + +/** + * Simple on-heap memory store that allows to buffer and retrieve a fixed number of events. + */ +public final class DiagnosticEventMemoryStore implements DiagnosticEventStore<Long> +{ + private final AtomicLong lastKey = new AtomicLong(0); + + private int maxSize = 200; + + // event access will mostly happen based on a recent event offset, so we add new events to the head of the list + // for optimized search times + private final ConcurrentSkipListMap<Long, DiagnosticEvent> events = new ConcurrentSkipListMap<>(Comparator.reverseOrder()); + + public void load() + { + // no-op + } + + public void store(DiagnosticEvent event) + { + long keyHead = lastKey.incrementAndGet(); + events.put(keyHead, event); + + // remove elements starting exceeding max size + if (keyHead > maxSize) events.tailMap(keyHead - maxSize).clear(); + } + + public NavigableMap<Long, DiagnosticEvent> scan(Long id, int limit) + { + assert id != null && id >= 0; + assert limit >= 0; + + // [10..1].headMap(2, false): [10..3] + ConcurrentNavigableMap<Long, DiagnosticEvent> newerEvents = events.headMap(id, true); + // [3..10] + ConcurrentNavigableMap<Long, DiagnosticEvent> ret = newerEvents.descendingMap(); + if (limit == 0) + { + return ret; + } + else + { + Map.Entry<Long, DiagnosticEvent> first = ret.firstEntry(); + if (first == null) return ret; + else return ret.headMap(first.getKey() + limit); + } + } + + public Long getLastEventId() + { + return lastKey.get(); + } + + @VisibleForTesting + int size() + { + return events.size(); + } + + @VisibleForTesting + void setMaxSize(int maxSize) + { + this.maxSize = maxSize; + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/src/java/org/apache/cassandra/diag/store/DiagnosticEventStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/diag/store/DiagnosticEventStore.java b/src/java/org/apache/cassandra/diag/store/DiagnosticEventStore.java new file mode 100644 index 0000000..86b2df3 --- /dev/null +++ b/src/java/org/apache/cassandra/diag/store/DiagnosticEventStore.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.diag.store; + +import java.util.NavigableMap; + +import org.apache.cassandra.diag.DiagnosticEvent; + +/** + * Enables storing and retrieving {@link DiagnosticEvent}s. + * @param <T> type of key that is used to reference an event + */ +public interface DiagnosticEventStore<T extends Comparable<T>> +{ + /** + * Initializes the store. + */ + void load(); + + /** + * Stores provided event and returns the new associated store key for it. + */ + void store(DiagnosticEvent event); + + /** + * Returns a view on all events with a key greater than the provided value (inclusive) up to the specified + * number of results. Events may be added or become unavailable over time. Keys must be unique, sortable and + * monotonically incrementing. Returns an empty map in case no events could be found. + */ + NavigableMap<T, DiagnosticEvent> scan(T key, int limit); + + /** + * Returns the greatest event ID that can be used to fetch events via {@link #scan(Comparable, int)}. + */ + T getLastEventId(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index cc02e17..84ec49d 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -111,6 +111,7 @@ import org.apache.cassandra.utils.TopKSampler.SamplerResult; import org.apache.cassandra.utils.logging.LoggingSupportFactory; import org.apache.cassandra.utils.progress.ProgressEvent; import org.apache.cassandra.utils.progress.ProgressEventType; +import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor; import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport; import static java.util.Arrays.asList; @@ -261,8 +262,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public StorageService() { - // use dedicated executor for sending JMX notifications - super(Executors.newSingleThreadExecutor()); + // use dedicated executor for handling JMX notifications + super(JMXBroadcastExecutor.executor); MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/src/java/org/apache/cassandra/utils/progress/jmx/JMXBroadcastExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/progress/jmx/JMXBroadcastExecutor.java b/src/java/org/apache/cassandra/utils/progress/jmx/JMXBroadcastExecutor.java new file mode 100644 index 0000000..f545f0f --- /dev/null +++ b/src/java/org/apache/cassandra/utils/progress/jmx/JMXBroadcastExecutor.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.progress.jmx; + +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +/** + * Holds dedicated executor for JMX event handling. Events will internally queued by ArrayNotificationBuffer, + * synchronized by an exclusive write lock, which makes a shared single thread executor desirable. + */ +public final class JMXBroadcastExecutor +{ + + private JMXBroadcastExecutor() { } + + public final static Executor executor = Executors.newSingleThreadExecutor(); + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/test/microbench/org/apache/cassandra/test/microbench/DiagnosticEventPersistenceBench.java ---------------------------------------------------------------------- diff --git a/test/microbench/org/apache/cassandra/test/microbench/DiagnosticEventPersistenceBench.java b/test/microbench/org/apache/cassandra/test/microbench/DiagnosticEventPersistenceBench.java new file mode 100644 index 0000000..5ddf0c5 --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/DiagnosticEventPersistenceBench.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench; + +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.OverrideConfigurationLoader; +import org.apache.cassandra.diag.DiagnosticEvent; +import org.apache.cassandra.diag.DiagnosticEventPersistence; +import org.apache.cassandra.diag.DiagnosticEventService; +import org.apache.cassandra.test.microbench.DiagnosticEventServiceBench.DummyEvent; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +@BenchmarkMode(Mode.All) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Warmup(iterations = 4, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 8, time = 2, timeUnit = TimeUnit.SECONDS) +@Fork(value = 2) +@Threads(4) +@State(Scope.Benchmark) +public class DiagnosticEventPersistenceBench +{ + private DiagnosticEventService service = DiagnosticEventService.instance(); + private DiagnosticEventPersistence persistence = DiagnosticEventPersistence.instance(); + private DiagnosticEvent event = new DummyEvent(); + + @Setup + public void setup() + { + OverrideConfigurationLoader.override((config) -> { + config.diagnostic_events_enabled = true; + }); + DatabaseDescriptor.daemonInitialization(); + + service.cleanup(); + + // make persistence subscribe to and store events + persistence.enableEventPersistence(DummyEvent.class.getName()); + } + + @Benchmark + public void persistEvents() + { + service.publish(event); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/test/microbench/org/apache/cassandra/test/microbench/DiagnosticEventServiceBench.java ---------------------------------------------------------------------- diff --git a/test/microbench/org/apache/cassandra/test/microbench/DiagnosticEventServiceBench.java b/test/microbench/org/apache/cassandra/test/microbench/DiagnosticEventServiceBench.java new file mode 100644 index 0000000..351138e --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/DiagnosticEventServiceBench.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench; + +import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.OverrideConfigurationLoader; +import org.apache.cassandra.diag.DiagnosticEvent; +import org.apache.cassandra.diag.DiagnosticEventService; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Warmup(iterations = 4, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 8, time = 2, timeUnit = TimeUnit.SECONDS) +@Fork(value = 2) +@Threads(4) +@State(Scope.Benchmark) +public class DiagnosticEventServiceBench +{ + private DiagnosticEventService service = DiagnosticEventService.instance(); + private DiagnosticEvent event = new DummyEvent(); + + @Param({ "0", "1", "12" }) + private int subscribers = 0; + + @Setup + public void setup() + { + OverrideConfigurationLoader.override((config) -> { + config.diagnostic_events_enabled = true; + }); + DatabaseDescriptor.daemonInitialization(); + + service.cleanup(); + + for (int i = 0; i < subscribers; i++) + { + service.subscribe(DummyEvent.class, new Consumer<DummyEvent>() + { + public void accept(DummyEvent dummyEvent) + { + // No-op + } + }); + } + } + + @Benchmark + public void publishEvents() + { + service.publish(event); + } + + final static class DummyEvent extends DiagnosticEvent + { + public Enum<?> getType() + { + return null; + } + + public Object getSource() + { + return null; + } + + public Map<String, Serializable> toMap() + { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/test/unit/org/apache/cassandra/config/OverrideConfigurationLoader.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/config/OverrideConfigurationLoader.java b/test/unit/org/apache/cassandra/config/OverrideConfigurationLoader.java new file mode 100644 index 0000000..e0a5576 --- /dev/null +++ b/test/unit/org/apache/cassandra/config/OverrideConfigurationLoader.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.config; + +import java.util.function.Consumer; +import java.util.function.Function; + +import org.apache.cassandra.exceptions.ConfigurationException; + +/** + * Helper class for programmatically overriding individual config values before DatabaseDescriptor is bootstrapped. + */ +public class OverrideConfigurationLoader implements ConfigurationLoader +{ + + private static Consumer<Config> configModifier; + + public Config loadConfig() throws ConfigurationException + { + YamlConfigurationLoader loader = new YamlConfigurationLoader(); + Config config = loader.loadConfig(); + configModifier.accept(config); + return config; + } + + public static void override(Consumer<Config> modifier) + { + System.setProperty(Config.PROPERTY_PREFIX + "config.loader", OverrideConfigurationLoader.class.getName()); + configModifier = modifier; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/test/unit/org/apache/cassandra/diag/DiagnosticEventServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/diag/DiagnosticEventServiceTest.java b/test/unit/org/apache/cassandra/diag/DiagnosticEventServiceTest.java index 0db5ef6..a645c03 100644 --- a/test/unit/org/apache/cassandra/diag/DiagnosticEventServiceTest.java +++ b/test/unit/org/apache/cassandra/diag/DiagnosticEventServiceTest.java @@ -214,7 +214,7 @@ public class DiagnosticEventServiceTest DatabaseDescriptor.setDiagnosticEventsEnabled(true); } - private static class TestEvent1 extends DiagnosticEvent + public static class TestEvent1 extends DiagnosticEvent { public TestEventType getType() { @@ -227,7 +227,7 @@ public class DiagnosticEventServiceTest } } - private static class TestEvent2 extends DiagnosticEvent + public static class TestEvent2 extends DiagnosticEvent { public TestEventType getType() { @@ -240,5 +240,5 @@ public class DiagnosticEventServiceTest } } - private enum TestEventType { TEST1, TEST2, TEST3 }; + public enum TestEventType { TEST1, TEST2, TEST3 }; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a79e5903/test/unit/org/apache/cassandra/diag/store/DiagnosticEventMemoryStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/diag/store/DiagnosticEventMemoryStoreTest.java b/test/unit/org/apache/cassandra/diag/store/DiagnosticEventMemoryStoreTest.java new file mode 100644 index 0000000..5e897b6 --- /dev/null +++ b/test/unit/org/apache/cassandra/diag/store/DiagnosticEventMemoryStoreTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.diag.store; + +import java.util.Map; +import java.util.NavigableMap; + +import org.junit.Test; + +import org.apache.cassandra.diag.DiagnosticEvent; +import org.apache.cassandra.diag.DiagnosticEventServiceTest.TestEvent1; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +public class DiagnosticEventMemoryStoreTest +{ + @Test + public void testEmpty() + { + DiagnosticEventMemoryStore store = new DiagnosticEventMemoryStore(); + assertEquals(0, store.size()); + assertEquals(0, store.scan(0L, 10).size()); + } + + @Test + public void testSingle() + { + DiagnosticEventMemoryStore store = new DiagnosticEventMemoryStore(); + store.store(new TestEvent1()); + assertEquals(1, store.size()); + assertEquals(1, store.scan(0L, 10).size()); + } + + @Test + public void testIdentity() + { + DiagnosticEventMemoryStore store = new DiagnosticEventMemoryStore(); + TestEvent1 e1 = new TestEvent1(); + TestEvent1 e2 = new TestEvent1(); + TestEvent1 e3 = new TestEvent1(); + + store.store(e1); + store.store(e2); + store.store(e3); + + assertEquals(3, store.size()); + + NavigableMap<Long, DiagnosticEvent> res = store.scan(0L, 10); + assertEquals(3, res.size()); + + Map.Entry<Long, DiagnosticEvent> entry = res.pollFirstEntry(); + assertEquals(new Long(1), entry.getKey()); + assertSame(e1, entry.getValue()); + + entry = res.pollFirstEntry(); + assertEquals(new Long(2), entry.getKey()); + assertSame(e2, entry.getValue()); + + entry = res.pollFirstEntry(); + assertEquals(new Long(3), entry.getKey()); + assertSame(e3, entry.getValue()); + } + + @Test + public void testLimit() + { + DiagnosticEventMemoryStore store = new DiagnosticEventMemoryStore(); + + TestEvent1 e1 = new TestEvent1(); + TestEvent1 e2 = new TestEvent1(); + TestEvent1 e3 = new TestEvent1(); + + store.store(e1); + store.store(e2); + store.store(e3); + + NavigableMap<Long, DiagnosticEvent> res = store.scan(0L, 2); + assertEquals(2, res.size()); + + Map.Entry<Long, DiagnosticEvent> entry = res.pollFirstEntry(); + assertEquals(new Long(1), entry.getKey()); + assertSame(e1, entry.getValue()); + + entry = res.pollLastEntry(); + assertEquals(new Long(2), entry.getKey()); + assertSame(e2, entry.getValue()); + } + + @Test + public void testSeek() + { + DiagnosticEventMemoryStore store = new DiagnosticEventMemoryStore(); + + TestEvent1 e2 = new TestEvent1(); + TestEvent1 e3 = new TestEvent1(); + + store.store(new TestEvent1()); + store.store(e2); + store.store(e3); + store.store(new TestEvent1()); + store.store(new TestEvent1()); + store.store(new TestEvent1()); + + NavigableMap<Long, DiagnosticEvent> res = store.scan(2L, 2); + assertEquals(2, res.size()); + + Map.Entry<Long, DiagnosticEvent> entry = res.pollFirstEntry(); + assertEquals(new Long(2), entry.getKey()); + assertSame(e2, entry.getValue()); + + entry = res.pollLastEntry(); + assertEquals(new Long(3), entry.getKey()); + assertSame(e3, entry.getValue()); + } + + @Test + public void testMaxElements() + { + DiagnosticEventMemoryStore store = new DiagnosticEventMemoryStore(); + store.setMaxSize(3); + + store.store(new TestEvent1()); + store.store(new TestEvent1()); + store.store(new TestEvent1()); + TestEvent1 e2 = new TestEvent1(); // 4 + TestEvent1 e3 = new TestEvent1(); + store.store(e2); + store.store(e3); + store.store(new TestEvent1()); // 6 + + assertEquals(3, store.size()); + + NavigableMap<Long, DiagnosticEvent> res = store.scan(4L, 2); + assertEquals(2, res.size()); + + Map.Entry<Long, DiagnosticEvent> entry = res.pollFirstEntry(); + assertEquals(new Long(4), entry.getKey()); + assertSame(e2, entry.getValue()); + + entry = res.pollFirstEntry(); + assertEquals(new Long(5), entry.getKey()); + assertSame(e3, entry.getValue()); + + store.store(new TestEvent1()); // 7 + store.store(new TestEvent1()); + store.store(new TestEvent1()); + + res = store.scan(4L, 2); + assertEquals(2, res.size()); + assertEquals(new Long(7), res.pollFirstEntry().getKey()); + assertEquals(new Long(8), res.pollLastEntry().getKey()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
