Repository: cassandra Updated Branches: refs/heads/trunk d76adf500 -> 4adb98146
Generalize progress reporting patch by yukim; reviewed by Josh McKenzie for CASSANDRA-8901 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4adb9814 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4adb9814 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4adb9814 Branch: refs/heads/trunk Commit: 4adb9814639d6e4225105e0c364044fac8b4b84d Parents: d76adf5 Author: Yuki Morishita <[email protected]> Authored: Tue Mar 10 14:17:31 2015 -0500 Committer: Yuki Morishita <[email protected]> Committed: Tue Mar 10 14:17:31 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/net/OutboundTcpConnection.java | 2 +- .../apache/cassandra/repair/RepairRunnable.java | 394 +++++++++++++++++++ .../cassandra/service/ActiveRepairService.java | 6 - .../cassandra/service/StorageService.java | 282 +------------ .../org/apache/cassandra/tools/NodeProbe.java | 2 - .../apache/cassandra/tools/RepairRunner.java | 99 +++-- .../apache/cassandra/tracing/TraceState.java | 45 ++- .../cassandra/utils/progress/ProgressEvent.java | 75 ++++ .../utils/progress/ProgressEventNotifier.java | 38 ++ .../utils/progress/ProgressEventType.java | 72 ++++ .../utils/progress/ProgressListener.java | 32 ++ .../jmx/JMXNotificationProgressListener.java | 94 +++++ .../utils/progress/jmx/JMXProgressSupport.java | 58 +++ 14 files changed, 865 insertions(+), 335 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 29e1edf..2de6137 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -62,6 +62,7 @@ * Improve concurrency of repair (CASSANDRA-6455, 8208) * Select optimal CRC32 implementation at runtime (CASSANDRA-8614) * Evaluate MurmurHash of Token once per query (CASSANDRA-7096) + * Generalize progress reporting (CASSANDRA-8901) 2.1.4 * cassandra-stress reports per-operation statistics, plus misc (CASSANDRA-8769) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index 5130974..6db83b4 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -217,7 +217,7 @@ public class OutboundTcpConnection extends Thread { byte[] traceTypeBytes = qm.message.parameters.get(Tracing.TRACE_TYPE); Tracing.TraceType traceType = traceTypeBytes == null ? Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]); - TraceState.trace(ByteBuffer.wrap(sessionBytes), message, -1, traceType.getTTL(), null); + TraceState.mutateWithTracing(ByteBuffer.wrap(sessionBytes), message, -1, traceType.getTTL()); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java new file mode 100644 index 0000000..89c0d70 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.*; +import org.apache.commons.lang3.time.DurationFormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.cql3.statements.SelectStatement; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.repair.messages.RepairOption; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tracing.TraceKeyspace; +import org.apache.cassandra.tracing.TraceState; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.transport.messages.ResultMessage; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; +import org.apache.cassandra.utils.WrappedRunnable; +import org.apache.cassandra.utils.progress.ProgressEvent; +import org.apache.cassandra.utils.progress.ProgressEventNotifier; +import org.apache.cassandra.utils.progress.ProgressEventType; +import org.apache.cassandra.utils.progress.ProgressListener; + +public class RepairRunnable extends WrappedRunnable implements ProgressEventNotifier +{ + private static final Logger logger = LoggerFactory.getLogger(RepairRunnable.class); + + private StorageService storageService; + private final int cmd; + private final RepairOption options; + private final String keyspace; + + private final List<ProgressListener> listeners = new ArrayList<>(); + + public RepairRunnable(StorageService storageService, int cmd, RepairOption options, String keyspace) + { + this.storageService = storageService; + this.cmd = cmd; + this.options = options; + this.keyspace = keyspace; + } + + @Override + public void addProgressListener(ProgressListener listener) + { + listeners.add(listener); + } + + @Override + public void removeProgressListener(ProgressListener listener) + { + listeners.remove(listener); + } + + protected void fireProgressEvent(String tag, ProgressEvent event) + { + for (ProgressListener listener : listeners) + { + listener.progress(tag, event); + } + } + + protected void fireErrorAndComplete(String tag, int progressCount, int totalProgress, String message) + { + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progressCount, totalProgress, message)); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progressCount, totalProgress)); + } + + protected void runMayThrow() throws Exception + { + final TraceState traceState; + + final String tag = "repair:" + cmd; + + final AtomicInteger progress = new AtomicInteger(); + final int totalProgress = 3 + options.getRanges().size(); // calculate neighbors, validation, prepare for repair + number of ranges to repair + + String[] columnFamilies = options.getColumnFamilies().toArray(new String[options.getColumnFamilies().size()]); + Iterable<ColumnFamilyStore> validColumnFamilies = storageService.getValidColumnFamilies(false, false, keyspace, + columnFamilies); + + final long startTime = System.currentTimeMillis(); + String message = String.format("Starting repair command #%d, repairing keyspace %s with %s", cmd, keyspace, + options); + logger.info(message); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.START, 0, 100, message)); + if (options.isTraced()) + { + StringBuilder cfsb = new StringBuilder(); + for (ColumnFamilyStore cfs : validColumnFamilies) + cfsb.append(", ").append(cfs.keyspace.getName()).append(".").append(cfs.name); + + UUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR); + traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", keyspace, "columnFamilies", + cfsb.substring(2))); + Tracing.traceRepair(message); + traceState.enableActivityNotification(tag); + for (ProgressListener listener : listeners) + traceState.addProgressListener(listener); + Thread queryThread = createQueryThread(cmd, sessionId); + queryThread.setName("RepairTracePolling"); + queryThread.start(); + } + else + { + traceState = null; + } + + final Set<InetAddress> allNeighbors = new HashSet<>(); + Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>(); + try + { + for (Range<Token> range : options.getRanges()) + { + Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range, + options.getDataCenters(), + options.getHosts()); + rangeToNeighbors.put(range, neighbors); + allNeighbors.addAll(neighbors); + } + progress.incrementAndGet(); + } + catch (IllegalArgumentException e) + { + logger.error("Repair failed:", e); + fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage()); + return; + } + + // Validate columnfamilies + List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(); + try + { + Iterables.addAll(columnFamilyStores, validColumnFamilies); + progress.incrementAndGet(); + } + catch (IllegalArgumentException e) + { + fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage()); + return; + } + + final UUID parentSession; + long repairedAt; + try + { + parentSession = ActiveRepairService.instance.prepareForRepair(allNeighbors, options, columnFamilyStores); + repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).repairedAt; + progress.incrementAndGet(); + } + catch (Throwable t) + { + fireErrorAndComplete(tag, progress.get(), totalProgress, t.getMessage()); + return; + } + + // Set up RepairJob executor for this repair command. + final ListeningExecutorService executor = MoreExecutors.listeningDecorator(new JMXConfigurableThreadPoolExecutor(options.getJobThreads(), + Integer.MAX_VALUE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), + new NamedThreadFactory("Repair#" + cmd), + "internal")); + + List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size()); + String[] cfnames = new String[columnFamilyStores.size()]; + for (int i = 0; i < columnFamilyStores.size(); i++) + { + cfnames[i] = columnFamilyStores.get(i).name; + } + for (Range<Token> range : options.getRanges()) + { + final RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession, + range, + keyspace, + options.getParallelism(), + rangeToNeighbors.get(range), + repairedAt, + executor, + cfnames); + if (session == null) + continue; + // After repair session completes, notify client its result + Futures.addCallback(session, new FutureCallback<RepairSessionResult>() + { + public void onSuccess(RepairSessionResult result) + { + String message = String.format("Repair session %s for range %s finished", session.getId(), + session.getRange().toString()); + logger.info(message); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS, + progress.incrementAndGet(), + totalProgress, + message)); + } + + public void onFailure(Throwable t) + { + String message = String.format("Repair session %s for range %s failed with error %s", + session.getId(), session.getRange().toString(), t.getMessage()); + logger.error(message, t); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS, + progress.incrementAndGet(), + totalProgress, + message)); + } + }); + futures.add(session); + } + + // After all repair sessions completes(successful or not), + // run anticompaction if necessary and send finish notice back to client + final ListenableFuture<List<RepairSessionResult>> allSessions = Futures.successfulAsList(futures); + Futures.addCallback(allSessions, new FutureCallback<List<RepairSessionResult>>() + { + public void onSuccess(List<RepairSessionResult> result) + { + boolean hasFailure = false; + // filter out null(=failed) results and get successful ranges + Collection<Range<Token>> successfulRanges = new ArrayList<>(); + for (RepairSessionResult sessionResult : result) + { + if (sessionResult != null) + { + successfulRanges.add(sessionResult.range); + } + else + { + hasFailure = true; + } + } + try + { + ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successfulRanges); + } + catch (Exception e) + { + logger.error("Error in incremental repair", e); + } + if (hasFailure) + { + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, + "Some repair failed")); + } + else + { + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress, + "Repair completed successfully")); + } + repairComplete(); + } + + public void onFailure(Throwable t) + { + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage())); + repairComplete(); + } + + private void repairComplete() + { + String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime, + true, true); + String message = String.format("Repair command #%d finished in %s", cmd, duration); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message)); + logger.info(message); + if (options.isTraced() && traceState != null) + { + for (ProgressListener listener : listeners) + traceState.removeProgressListener(listener); + // Because DebuggableThreadPoolExecutor#afterExecute and this callback + // run in a nondeterministic order (within the same thread), the + // TraceState may have been nulled out at this point. The TraceState + // should be traceState, so just set it without bothering to check if it + // actually was nulled out. + Tracing.instance.set(traceState); + Tracing.traceRepair(message); + Tracing.instance.stopSession(); + } + executor.shutdownNow(); + } + }); + } + + private Thread createQueryThread(final int cmd, final UUID sessionId) + { + return new Thread(new WrappedRunnable() + { + // Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces. + // Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts. + public void runMayThrow() throws Exception + { + TraceState state = Tracing.instance.get(sessionId); + if (state == null) + throw new Exception("no tracestate"); + + String format = "select event_id, source, activity from %s.%s where session_id = ? and event_id > ? and event_id < ?;"; + String query = String.format(format, TraceKeyspace.NAME, TraceKeyspace.EVENTS); + SelectStatement statement = (SelectStatement) QueryProcessor.parseStatement(query).prepare().statement; + + ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId); + InetAddress source = FBUtilities.getBroadcastAddress(); + + HashSet<UUID>[] seen = new HashSet[] { new HashSet<>(), new HashSet<>() }; + int si = 0; + UUID uuid; + + long tlast = System.currentTimeMillis(), tcur; + + TraceState.Status status; + long minWaitMillis = 125; + long maxWaitMillis = 1000 * 1024L; + long timeout = minWaitMillis; + boolean shouldDouble = false; + + while ((status = state.waitActivity(timeout)) != TraceState.Status.STOPPED) + { + if (status == TraceState.Status.IDLE) + { + timeout = shouldDouble ? Math.min(timeout * 2, maxWaitMillis) : timeout; + shouldDouble = !shouldDouble; + } + else + { + timeout = minWaitMillis; + shouldDouble = false; + } + ByteBuffer tminBytes = ByteBufferUtil.bytes(UUIDGen.minTimeUUID(tlast - 1000)); + ByteBuffer tmaxBytes = ByteBufferUtil.bytes(UUIDGen.maxTimeUUID(tcur = System.currentTimeMillis())); + QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(sessionIdBytes, + tminBytes, + tmaxBytes)); + ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options); + UntypedResultSet result = UntypedResultSet.create(rows.result); + + for (UntypedResultSet.Row r : result) + { + if (source.equals(r.getInetAddress("source"))) + continue; + if ((uuid = r.getUUID("event_id")).timestamp() > (tcur - 1000) * 10000) + seen[si].add(uuid); + if (seen[si == 0 ? 1 : 0].contains(uuid)) + continue; + String message = String.format("%s: %s", r.getInetAddress("source"), r.getString("activity")); + fireProgressEvent("repair:" + cmd, + new ProgressEvent(ProgressEventType.NOTIFICATION, 0, 0, message)); + } + tlast = tcur; + + si = si == 0 ? 1 : 0; + seen[si].clear(); + } + } + }); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index faa32c2..7c8e1cc 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -57,7 +57,6 @@ import org.apache.cassandra.repair.messages.*; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.concurrent.Ref; -import org.apache.cassandra.utils.concurrent.RefCounted; import org.apache.cassandra.utils.concurrent.Refs; @@ -83,11 +82,6 @@ public class ActiveRepairService public static final long UNREPAIRED_SSTABLE = 0; - public static enum Status - { - STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED, RUNNING - } - /** * A map of active coordinator session. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index f616710..35c67c4 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -25,7 +25,6 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import javax.management.*; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; @@ -35,7 +34,6 @@ import com.google.common.base.Predicate; import com.google.common.collect.*; import com.google.common.util.concurrent.*; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.time.DurationFormatUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,10 +45,6 @@ import org.apache.cassandra.auth.AuthKeyspace; import org.apache.cassandra.auth.AuthMigrationListener; import org.apache.cassandra.concurrent.*; import org.apache.cassandra.config.*; -import org.apache.cassandra.cql3.QueryOptions; -import org.apache.cassandra.cql3.QueryProcessor; -import org.apache.cassandra.cql3.UntypedResultSet; -import org.apache.cassandra.cql3.statements.SelectStatement; import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; @@ -77,10 +71,8 @@ import org.apache.cassandra.thrift.EndpointDetails; import org.apache.cassandra.thrift.TokenRange; import org.apache.cassandra.thrift.cassandraConstants; import org.apache.cassandra.tracing.TraceKeyspace; -import org.apache.cassandra.tracing.TraceState; -import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport; import static java.nio.charset.StandardCharsets.ISO_8859_1; @@ -96,8 +88,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public static final int RING_DELAY = getRingDelay(); // delay after which we assume ring has stablized - /* JMX notification serial number counter */ - private final AtomicLong notificationSerialNumber = new AtomicLong(); + private final JMXProgressSupport progressSupport = new JMXProgressSupport(this); private static int getRingDelay() { @@ -2419,20 +2410,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } - /** - * Sends JMX notification to subscribers. - * - * @param type Message type - * @param message Message itself - * @param userObject Arbitrary object to attach to notification - */ - public void sendNotification(String type, String message, Object userObject) - { - Notification jmxNotification = new Notification(type, jmxObjectName, notificationSerialNumber.incrementAndGet(), message); - jmxNotification.setUserData(userObject); - sendNotification(jmxNotification); - } - public int repairAsync(String keyspace, Map<String, String> repairSpec) { RepairOption option = RepairOption.parse(repairSpec, getPartitioner()); @@ -2533,7 +2510,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE boolean fullRepair, String... columnFamilies) { - return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(), dataCenters, hosts, fullRepair, columnFamilies); + return forceRepairRangeAsync(beginToken, endToken, keyspaceName, + isSequential ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(), + dataCenters, hosts, fullRepair, columnFamilies); } public int forceRepairRangeAsync(String beginToken, @@ -2641,75 +2620,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return cmd; } - private Thread createQueryThread(final int cmd, final UUID sessionId) - { - return new Thread(new WrappedRunnable() - { - // Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces. - // Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts. - public void runMayThrow() throws Exception - { - TraceState state = Tracing.instance.get(sessionId); - if (state == null) - throw new Exception("no tracestate"); - - String format = "select event_id, source, activity from %s.%s where session_id = ? and event_id > ? and event_id < ?;"; - String query = String.format(format, TraceKeyspace.NAME, TraceKeyspace.EVENTS); - SelectStatement statement = (SelectStatement) QueryProcessor.parseStatement(query).prepare().statement; - - ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId); - InetAddress source = FBUtilities.getBroadcastAddress(); - - HashSet<UUID>[] seen = new HashSet[] { new HashSet<UUID>(), new HashSet<UUID>() }; - int si = 0; - UUID uuid; - - long tlast = System.currentTimeMillis(), tcur; - - TraceState.Status status; - long minWaitMillis = 125; - long maxWaitMillis = 1000 * 1024L; - long timeout = minWaitMillis; - boolean shouldDouble = false; - - while ((status = state.waitActivity(timeout)) != TraceState.Status.STOPPED) - { - if (status == TraceState.Status.IDLE) - { - timeout = shouldDouble ? Math.min(timeout * 2, maxWaitMillis) : timeout; - shouldDouble = !shouldDouble; - } - else - { - timeout = minWaitMillis; - shouldDouble = false; - } - ByteBuffer tminBytes = ByteBufferUtil.bytes(UUIDGen.minTimeUUID(tlast - 1000)); - ByteBuffer tmaxBytes = ByteBufferUtil.bytes(UUIDGen.maxTimeUUID(tcur = System.currentTimeMillis())); - QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(sessionIdBytes, tminBytes, tmaxBytes)); - ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options); - UntypedResultSet result = UntypedResultSet.create(rows.result); - - for (UntypedResultSet.Row r : result) - { - if (source.equals(r.getInetAddress("source"))) - continue; - if ((uuid = r.getUUID("event_id")).timestamp() > (tcur - 1000) * 10000) - seen[si].add(uuid); - if (seen[si == 0 ? 1 : 0].contains(uuid)) - continue; - String message = String.format("%s: %s", r.getInetAddress("source"), r.getString("activity")); - sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.RUNNING.ordinal()}); - } - tlast = tcur; - - si = si == 0 ? 1 : 0; - seen[si].clear(); - } - } - }); - } - private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final RepairOption options) { if (!options.getDataCenters().isEmpty() && options.getDataCenters().contains(DatabaseDescriptor.getLocalDataCenter())) @@ -2717,184 +2627,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE throw new IllegalArgumentException("the local data center must be part of the repair"); } - return new FutureTask<>(new WrappedRunnable() - { - protected void runMayThrow() throws Exception - { - final TraceState traceState; - - String[] columnFamilies = options.getColumnFamilies().toArray(new String[options.getColumnFamilies().size()]); - Iterable<ColumnFamilyStore> validColumnFamilies = getValidColumnFamilies(false, false, keyspace, columnFamilies); - - final long startTime = System.currentTimeMillis(); - String message = String.format("Starting repair command #%d, repairing keyspace %s with %s", cmd, keyspace, options); - logger.info(message); - sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.STARTED.ordinal()}); - if (options.isTraced()) - { - StringBuilder cfsb = new StringBuilder(); - for (ColumnFamilyStore cfs : validColumnFamilies) - cfsb.append(", ").append(cfs.keyspace.getName()).append(".").append(cfs.name); - - UUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR); - traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", keyspace, "columnFamilies", cfsb.substring(2))); - Tracing.traceRepair(message); - traceState.enableActivityNotification(); - traceState.setNotificationHandle(new int[]{ cmd, ActiveRepairService.Status.RUNNING.ordinal() }); - Thread queryThread = createQueryThread(cmd, sessionId); - queryThread.setName("RepairTracePolling"); - queryThread.start(); - } - else - { - traceState = null; - } - - final Set<InetAddress> allNeighbors = new HashSet<>(); - Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>(); - for (Range<Token> range : options.getRanges()) - { - try - { - Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range, options.getDataCenters(), options.getHosts()); - rangeToNeighbors.put(range, neighbors); - allNeighbors.addAll(neighbors); - } - catch (IllegalArgumentException e) - { - logger.error("Repair failed:", e); - sendNotification("repair", e.getMessage(), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()}); - return; - } - } - - // Validate columnfamilies - List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(); - try - { - Iterables.addAll(columnFamilyStores, validColumnFamilies); - } - catch (IllegalArgumentException e) - { - sendNotification("repair", e.getMessage(), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()}); - return; - } - - final UUID parentSession; - long repairedAt; - try - { - parentSession = ActiveRepairService.instance.prepareForRepair(allNeighbors, options, columnFamilyStores); - repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).repairedAt; - } - catch (Throwable t) - { - sendNotification("repair", String.format("Repair failed with error %s", t.getMessage()), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()}); - return; - } - - // Set up RepairJob executor for this repair command. - final ListeningExecutorService executor = MoreExecutors.listeningDecorator(new JMXConfigurableThreadPoolExecutor(options.getJobThreads(), - Integer.MAX_VALUE, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), - new NamedThreadFactory("Repair#" + cmd), - "internal")); - - List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size()); - String[] cfnames = new String[columnFamilyStores.size()]; - for (int i = 0; i < columnFamilyStores.size(); i++) - { - cfnames[i] = columnFamilyStores.get(i).name; - } - for (Range<Token> range : options.getRanges()) - { - final RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession, - range, - keyspace, - options.getParallelism(), - rangeToNeighbors.get(range), - repairedAt, - executor, - cfnames); - if (session == null) - continue; - // After repair session completes, notify client its result - Futures.addCallback(session, new FutureCallback<RepairSessionResult>() - { - public void onSuccess(RepairSessionResult result) - { - String message = String.format("Repair session %s for range %s finished", session.getId(), session.getRange().toString()); - logger.info(message); - sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_SUCCESS.ordinal()}); - } - - public void onFailure(Throwable t) - { - String message = String.format("Repair session %s for range %s failed with error %s", session.getId(), session.getRange().toString(), t.getMessage()); - logger.error(message, t); - sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal()}); - } - }); - futures.add(session); - } - - // After all repair sessions completes(successful or not), - // run anticompaction if necessary and send finish notice back to client - final ListenableFuture<List<RepairSessionResult>> allSessions = Futures.successfulAsList(futures); - Futures.addCallback(allSessions, new FutureCallback<List<RepairSessionResult>>() - { - public void onSuccess(List<RepairSessionResult> result) - { - // filter out null(=failed) results and get successful ranges - Collection<Range<Token>> successfulRanges = new ArrayList<>(); - for (RepairSessionResult sessionResult : result) - { - if (sessionResult != null) - { - successfulRanges.add(sessionResult.range); - } - } - try - { - ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successfulRanges); - } - catch (Exception e) - { - logger.error("Error in incremental repair", e); - } - repairComplete(); - } - - public void onFailure(Throwable t) - { - repairComplete(); - } - - private void repairComplete() - { - String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime, true, true); - String message = String.format("Repair command #%d finished in %s", cmd, duration); - sendNotification("repair", message, - new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()}); - logger.info(message); - if (options.isTraced()) - { - traceState.setNotificationHandle(null); - // Because DebuggableThreadPoolExecutor#afterExecute and this callback - // run in a nondeterministic order (within the same thread), the - // TraceState may have been nulled out at this point. The TraceState - // should be traceState, so just set it without bothering to check if it - // actually was nulled out. - Tracing.instance.set(traceState); - Tracing.traceRepair(message); - Tracing.instance.stopSession(); - } - executor.shutdownNow(); - } - }); - } - }, null); + RepairRunnable task = new RepairRunnable(this, cmd, options, keyspace); + task.addProgressListener(progressSupport); + return new FutureTask<>(task, null); } public void forceTerminateAllRepairSessions() { @@ -4213,4 +3948,5 @@ public class StorageService extends NotificationBroadcasterSupport implements IE DatabaseDescriptor.setHintedHandoffThrottleInKB(throttleInKB); logger.info(String.format("Updated hinted_handoff_throttle_in_kb to %d", throttleInKB)); } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 9ee05f2..5012ef5 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -258,8 +258,6 @@ public class NodeProbe implements AutoCloseable jmxc.addConnectionNotificationListener(runner, null, null); ssProxy.addNotificationListener(runner, null, null); runner.run(); - if (!runner.get()) - failed = true; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/tools/RepairRunner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/RepairRunner.java b/src/java/org/apache/cassandra/tools/RepairRunner.java index 1898bb4..0813775 100644 --- a/src/java/org/apache/cassandra/tools/RepairRunner.java +++ b/src/java/org/apache/cassandra/tools/RepairRunner.java @@ -21,16 +21,15 @@ import java.io.IOException; import java.io.PrintStream; import java.text.SimpleDateFormat; import java.util.Map; -import javax.management.Notification; -import javax.management.NotificationListener; -import javax.management.remote.JMXConnectionNotification; +import java.util.concurrent.locks.Condition; -import com.google.common.util.concurrent.AbstractFuture; - -import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageServiceMBean; +import org.apache.cassandra.utils.concurrent.SimpleCondition; +import org.apache.cassandra.utils.progress.ProgressEvent; +import org.apache.cassandra.utils.progress.ProgressEventType; +import org.apache.cassandra.utils.progress.jmx.JMXNotificationProgressListener; -public class RepairRunner extends AbstractFuture<Boolean> implements Runnable, NotificationListener +public class RepairRunner extends JMXNotificationProgressListener { private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); @@ -38,9 +37,11 @@ public class RepairRunner extends AbstractFuture<Boolean> implements Runnable, N private final StorageServiceMBean ssProxy; private final String keyspace; private final Map<String, String> options; + private final Condition condition = new SimpleCondition(); - private volatile int cmd; - private volatile boolean success; + private int cmd; + private volatile boolean hasNotificationLost; + private volatile Exception error; public RepairRunner(PrintStream out, StorageServiceMBean ssProxy, String keyspace, Map<String, String> options) { @@ -50,52 +51,68 @@ public class RepairRunner extends AbstractFuture<Boolean> implements Runnable, N this.options = options; } - public void run() + public void run() throws Exception { cmd = ssProxy.repairAsync(keyspace, options); if (cmd <= 0) { String message = String.format("[%s] Nothing to repair for keyspace '%s'", format.format(System.currentTimeMillis()), keyspace); out.println(message); - set(true); } - } - - public void handleNotification(Notification notification, Object handback) - { - if ("repair".equals(notification.getType())) + else { - int[] status = (int[]) notification.getUserData(); - assert status.length == 2; - if (cmd == status[0]) + condition.await(); + if (error != null) + { + throw error; + } + if (hasNotificationLost) { - String message = String.format("[%s] %s", format.format(notification.getTimeStamp()), notification.getMessage()); - out.println(message); - // repair status is int array with [0] = cmd number, [1] = status - if (status[1] == ActiveRepairService.Status.SESSION_FAILED.ordinal()) - { - success = false; - } - else if (status[1] == ActiveRepairService.Status.FINISHED.ordinal()) - { - set(success); - } + out.println(String.format("There were some lost notification(s). You should check server log for repair status of keyspace %s", keyspace)); } } - else if (JMXConnectionNotification.NOTIFS_LOST.equals(notification.getType())) + } + + @Override + public boolean isInterestedIn(String tag) + { + return tag.equals("repair:" + cmd); + } + + @Override + public void handleNotificationLost(long timestamp, String message) + { + hasNotificationLost = true; + } + + @Override + public void handleConnectionClosed(long timestamp, String message) + { + handleConnectionFailed(timestamp, message); + } + + @Override + public void handleConnectionFailed(long timestamp, String message) + { + error = new IOException(String.format("[%s] JMX connection closed. You should check server log for repair status of keyspace %s" + + "(Subsequent keyspaces are not going to be repaired).", + format.format(timestamp), keyspace)); + condition.signalAll(); + } + + @Override + public void progress(String tag, ProgressEvent event) + { + ProgressEventType type = event.getType(); + String message = String.format("[%s] %s", format.format(System.currentTimeMillis()), event.getMessage()); + if (type == ProgressEventType.PROGRESS) { - String message = String.format("[%s] Lost notification. You should check server log for repair status of keyspace %s", - format.format(notification.getTimeStamp()), - keyspace); - out.println(message); + message = message + " (progress: " + (int)event.getProgressPercentage() + "%)"; } - else if (JMXConnectionNotification.FAILED.equals(notification.getType()) - || JMXConnectionNotification.CLOSED.equals(notification.getType())) + out.println(message); + if (type == ProgressEventType.COMPLETE) { - String message = String.format("JMX connection closed. You should check server log for repair status of keyspace %s" - + "(Subsequent keyspaces are not going to be repaired).", - keyspace); - setException(new IOException(message)); + condition.signalAll(); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/tracing/TraceState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java index c67ad3e..758dceb 100644 --- a/src/java/org/apache/cassandra/tracing/TraceState.java +++ b/src/java/org/apache/cassandra/tracing/TraceState.java @@ -19,6 +19,8 @@ package org.apache.cassandra.tracing; import java.net.InetAddress; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -28,15 +30,17 @@ import org.slf4j.helpers.MessageFormatter; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.WrappedRunnable; +import org.apache.cassandra.utils.progress.ProgressEvent; +import org.apache.cassandra.utils.progress.ProgressEventNotifier; +import org.apache.cassandra.utils.progress.ProgressListener; /** * ThreadLocal state for a tracing session. The presence of an instance of this class as a ThreadLocal denotes that an * operation is being traced. */ -public class TraceState +public class TraceState implements ProgressEventNotifier { public final UUID sessionId; public final InetAddress coordinator; @@ -46,13 +50,14 @@ public class TraceState public final int ttl; private boolean notify; - private Object notificationHandle; + private List<ProgressListener> listeners = new ArrayList<>(); + private String tag; public enum Status { IDLE, ACTIVE, - STOPPED; + STOPPED } private Status status; @@ -80,16 +85,30 @@ public class TraceState this.status = Status.IDLE; } - public void enableActivityNotification() + /** + * Activate notification with provided {@code tag} name. + * + * @param tag Tag name to add when emitting notification + */ + public void enableActivityNotification(String tag) { assert traceType == Tracing.TraceType.REPAIR; notify = true; + this.tag = tag; + } + + @Override + public void addProgressListener(ProgressListener listener) + { + assert traceType == Tracing.TraceType.REPAIR; + listeners.add(listener); } - public void setNotificationHandle(Object handle) + @Override + public void removeProgressListener(ProgressListener listener) { assert traceType == Tracing.TraceType.REPAIR; - notificationHandle = handle; + listeners.remove(listener); } public int elapsed() @@ -158,16 +177,18 @@ public class TraceState if (notify) notifyActivity(); - TraceState.trace(sessionIdBytes, message, elapsed(), ttl, notificationHandle); + TraceState.mutateWithTracing(sessionIdBytes, message, elapsed(), ttl); + + for (ProgressListener listener : listeners) + { + listener.progress(tag, ProgressEvent.createNotification(message)); + } } - public static void trace(final ByteBuffer sessionId, final String message, final int elapsed, final int ttl, final Object notificationHandle) + public static void mutateWithTracing(final ByteBuffer sessionId, final String message, final int elapsed, final int ttl) { final String threadName = Thread.currentThread().getName(); - if (notificationHandle != null) - StorageService.instance.sendNotification("repair", message, notificationHandle); - StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable() { public void runMayThrow() http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/utils/progress/ProgressEvent.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/progress/ProgressEvent.java b/src/java/org/apache/cassandra/utils/progress/ProgressEvent.java new file mode 100644 index 0000000..31d3120 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/progress/ProgressEvent.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils.progress; + +/** + * Progress event + */ +public class ProgressEvent +{ + private final ProgressEventType type; + private final int progressCount; + private final int total; + private final String message; + + public static ProgressEvent createNotification(String message) + { + return new ProgressEvent(ProgressEventType.NOTIFICATION, 0, 0, message); + } + + public ProgressEvent(ProgressEventType type, int progressCount, int total) + { + this(type, progressCount, total, null); + } + + public ProgressEvent(ProgressEventType type, int progressCount, int total, String message) + { + this.type = type; + this.progressCount = progressCount; + this.total = total; + this.message = message; + } + + public ProgressEventType getType() + { + return type; + } + + public int getProgressCount() + { + return progressCount; + } + + public int getTotal() + { + return total; + } + + public double getProgressPercentage() + { + return total != 0 ? progressCount * 100 / (double) total : 0; + } + + /** + * @return Message attached to this event. Can be null. + */ + public String getMessage() + { + return message; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/utils/progress/ProgressEventNotifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/progress/ProgressEventNotifier.java b/src/java/org/apache/cassandra/utils/progress/ProgressEventNotifier.java new file mode 100644 index 0000000..07a6618 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/progress/ProgressEventNotifier.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils.progress; + +/** + * Interface for {@link ProgressEvent} publisher. + */ +public interface ProgressEventNotifier +{ + /** + * Register progress listener to this publisher. + * + * @param listener listener to register. + */ + void addProgressListener(ProgressListener listener); + + /** + * Remove progress listener from this publisher. + * + * @param listener listener to remove + */ + void removeProgressListener(ProgressListener listener); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/utils/progress/ProgressEventType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/progress/ProgressEventType.java b/src/java/org/apache/cassandra/utils/progress/ProgressEventType.java new file mode 100644 index 0000000..8d7daee --- /dev/null +++ b/src/java/org/apache/cassandra/utils/progress/ProgressEventType.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils.progress; + +/** + * Progress event type. + * + * <p> + * Progress starts by emitting {@link #START}, followed by emitting zero or more {@link #PROGRESS} events, + * then it emits either one of {@link #ERROR}/{@link #ABORT}/{@link #SUCCESS}. + * Progress indicates its completion by emitting {@link #COMPLETE} at the end of process. + * </p> + * <p> + * {@link #NOTIFICATION} event type is used to just notify message without progress. + * </p> + */ +public enum ProgressEventType +{ + /** + * Fired first when progress starts. + * Happens only once. + */ + START, + + /** + * Fire when progress happens. + * This can be zero or more time after START. + */ + PROGRESS, + + /** + * When observing process completes with error, this is sent once before COMPLETE. + */ + ERROR, + + /** + * When observing process is aborted by user, this is sent once before COMPLETE. + */ + ABORT, + + /** + * When observing process completes successfully, this is sent once before COMPLETE. + */ + SUCCESS, + + /** + * Fire when progress complete. + * This is fired once, after ERROR/ABORT/SUCCESS is fired. + * After this, no more ProgressEvent should be fired for the same event. + */ + COMPLETE, + + /** + * Used when sending message without progress. + */ + NOTIFICATION +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/utils/progress/ProgressListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/progress/ProgressListener.java b/src/java/org/apache/cassandra/utils/progress/ProgressListener.java new file mode 100644 index 0000000..48342a8 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/progress/ProgressListener.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils.progress; + +/** + * Listener interface to handle {@link org.apache.cassandra.utils.progress.ProgressEvent} + */ +public interface ProgressListener +{ + /** + * Called when some progress is made by progress publisher. + * + * @param tag String that identifies progress event. + * @param event Current progress + */ + void progress(String tag, ProgressEvent event); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/utils/progress/jmx/JMXNotificationProgressListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/progress/jmx/JMXNotificationProgressListener.java b/src/java/org/apache/cassandra/utils/progress/jmx/JMXNotificationProgressListener.java new file mode 100644 index 0000000..3461487 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/progress/jmx/JMXNotificationProgressListener.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils.progress.jmx; + +import java.util.Map; +import javax.management.Notification; +import javax.management.NotificationListener; +import javax.management.remote.JMXConnectionNotification; + +import org.apache.cassandra.utils.progress.ProgressEvent; +import org.apache.cassandra.utils.progress.ProgressEventType; +import org.apache.cassandra.utils.progress.ProgressListener; + +/** + * JMXNotificationProgressListener uses JMX Notification API to convert JMX Notification message to progress event + * and notifies its {@link ProgressListener}s. + * + * This is to be implemented in client tools side. + */ +public abstract class JMXNotificationProgressListener implements ProgressListener, NotificationListener +{ + /** + * @param tag tag name to be checked + * @return true if given tag for ProgressEvent is a target to consume. If this returns false, then + * {@link #progress} is not called for that event. + */ + public abstract boolean isInterestedIn(String tag); + + /** + * Called when receiving {@link JMXConnectionNotification#NOTIFS_LOST} message. + */ + public void handleNotificationLost(long timestamp, String message) {} + + /** + * Called when JMX connection is closed. + * Specifically when {@link JMXConnectionNotification#CLOSED} message is received. + */ + public void handleConnectionClosed(long timestamp, String message) {} + + /** + * Called when JMX connection is failed. + * Specifically when {@link JMXConnectionNotification#FAILED} message is received. + */ + public void handleConnectionFailed(long timestamp, String message) {} + + @SuppressWarnings("unchecked") + @Override + public void handleNotification(Notification notification, Object handback) + { + switch (notification.getType()) + { + case "progress": + String tag = (String) notification.getSource(); + if (this.isInterestedIn(tag)) + { + Map<String, Integer> progress = (Map<String, Integer>) notification.getUserData(); + String message = notification.getMessage(); + ProgressEvent event = new ProgressEvent(ProgressEventType.values()[progress.get("type")], + progress.get("progressCount"), + progress.get("total"), + message); + this.progress(tag, event); + } + break; + + case JMXConnectionNotification.NOTIFS_LOST: + handleNotificationLost(notification.getTimeStamp(), notification.getMessage()); + break; + + case JMXConnectionNotification.FAILED: + handleConnectionFailed(notification.getTimeStamp(), notification.getMessage()); + break; + + case JMXConnectionNotification.CLOSED: + handleConnectionClosed(notification.getTimeStamp(), notification.getMessage()); + break; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/utils/progress/jmx/JMXProgressSupport.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/progress/jmx/JMXProgressSupport.java b/src/java/org/apache/cassandra/utils/progress/jmx/JMXProgressSupport.java new file mode 100644 index 0000000..12efd0d --- /dev/null +++ b/src/java/org/apache/cassandra/utils/progress/jmx/JMXProgressSupport.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils.progress.jmx; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import javax.management.Notification; +import javax.management.NotificationBroadcasterSupport; + +import org.apache.cassandra.utils.progress.ProgressEvent; +import org.apache.cassandra.utils.progress.ProgressListener; + +/** + * ProgressListener that translates ProgressEvent to JMX Notification message. + */ +public class JMXProgressSupport implements ProgressListener +{ + private final AtomicLong notificationSerialNumber = new AtomicLong(); + + private final NotificationBroadcasterSupport broadcaster; + + public JMXProgressSupport(NotificationBroadcasterSupport broadcaster) + { + this.broadcaster = broadcaster; + } + + @Override + public void progress(String tag, ProgressEvent event) + { + Notification notification = new Notification("progress", + tag, + notificationSerialNumber.getAndIncrement(), + System.currentTimeMillis(), + event.getMessage()); + Map<String, Integer> userData = new HashMap<>(); + userData.put("type", event.getType().ordinal()); + userData.put("progressCount", event.getProgressCount()); + userData.put("total", event.getTotal()); + notification.setUserData(userData); + broadcaster.sendNotification(notification); + } +}
