Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/118bea59 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/118bea59 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/118bea59 Branch: refs/heads/cassandra-3.0 Commit: 118bea59e4a0bdc0b9cca9d717685f1157764c15 Parents: 2bacc9a 0318046 Author: Marcus Eriksson <marc...@apache.org> Authored: Thu May 26 08:18:21 2016 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Thu May 26 08:18:21 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../repair/RepairMessageVerbHandler.java | 9 +- .../apache/cassandra/repair/RepairRunnable.java | 2 +- .../cassandra/service/ActiveRepairService.java | 92 ++++++++++++++++++-- .../LeveledCompactionStrategyTest.java | 2 +- .../cassandra/repair/LocalSyncTaskTest.java | 3 +- .../service/ActiveRepairServiceTest.java | 2 +- 7 files changed, 95 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/118bea59/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 6a952c4,f73db6e..190c2fa --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,24 -1,5 +1,25 @@@ -2.1.15 +2.2.7 + * Enable client encryption in sstableloader with cli options (CASSANDRA-11708) + * Possible memory leak in NIODataInputStream (CASSANDRA-11867) + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669) + * Add seconds to cqlsh tracing session duration (CASSANDRA-11753) + * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395) + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626) + * Exit JVM if JMX server fails to startup (CASSANDRA-11540) + * Produce a heap dump when exiting on OOM (CASSANDRA-9861) + * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427) + * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510) + * JSON datetime formatting needs timezone (CASSANDRA-11137) + * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502) + * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660) + * Add missing files to debian packages (CASSANDRA-11642) + * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621) + * cqlsh: COPY FROM should use regular inserts for single statement batches and + report errors correctly if workers processes crash on initialization (CASSANDRA-11474) + * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553) + * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988) +Merged from 2.1: + * Clear out parent repair session if repair coordinator dies (CASSANDRA-11824) * Set default streaming_socket_timeout_in_ms to 24 hours (CASSANDRA-11840) * Do not consider local node a valid source during replace (CASSANDRA-11848) * Avoid holding SSTableReaders for duration of incremental repair (CASSANDRA-11739) http://git-wip-us.apache.org/repos/asf/cassandra/blob/118bea59/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index 8a0706a,7debc93..6e7922f --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@@ -66,26 -66,14 +66,27 @@@ public class RepairMessageVerbHandler i List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(prepareMessage.cfIds.size()); for (UUID cfId : prepareMessage.cfIds) { - Pair<String, String> kscf = Schema.instance.getCF(cfId); - ColumnFamilyStore columnFamilyStore = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); + ColumnFamilyStore columnFamilyStore = ColumnFamilyStore.getIfExists(cfId); + if (columnFamilyStore == null) + { + logErrorAndSendFailureResponse(String.format("Table with id %s was dropped during prepare phase of repair", + cfId.toString()), message.from, id); + return; + } columnFamilyStores.add(columnFamilyStore); } + CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(message.from); + // note that we default isGlobal to true since old version always default to true: + boolean isGlobal = peerVersion == null || + peerVersion.compareTo(ActiveRepairService.SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION) < 0 || + message.payload.messageType.equals(RepairMessage.Type.PREPARE_GLOBAL_MESSAGE); + logger.debug("Received prepare message: global message = {}, peerVersion = {},", message.payload.messageType.equals(RepairMessage.Type.PREPARE_GLOBAL_MESSAGE), peerVersion); ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession, - columnFamilyStores, - prepareMessage.ranges, - prepareMessage.isIncremental, - isGlobal); + message.from, + columnFamilyStores, - prepareMessage.ranges); ++ prepareMessage.ranges, ++ prepareMessage.isIncremental, ++ isGlobal); MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); break; http://git-wip-us.apache.org/repos/asf/cassandra/blob/118bea59/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/repair/RepairRunnable.java index d2b6ab6,0000000..b849cf8 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@@ -1,409 -1,0 +1,409 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.*; +import org.apache.commons.lang3.time.DurationFormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.cql3.statements.SelectStatement; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.repair.messages.RepairOption; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tracing.TraceKeyspace; +import org.apache.cassandra.tracing.TraceState; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.transport.messages.ResultMessage; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; +import org.apache.cassandra.utils.WrappedRunnable; +import org.apache.cassandra.utils.progress.ProgressEvent; +import org.apache.cassandra.utils.progress.ProgressEventNotifier; +import org.apache.cassandra.utils.progress.ProgressEventType; +import org.apache.cassandra.utils.progress.ProgressListener; + +public class RepairRunnable extends WrappedRunnable implements ProgressEventNotifier +{ + private static final Logger logger = LoggerFactory.getLogger(RepairRunnable.class); + + private StorageService storageService; + private final int cmd; + private final RepairOption options; + private final String keyspace; + + private final List<ProgressListener> listeners = new ArrayList<>(); + + public RepairRunnable(StorageService storageService, int cmd, RepairOption options, String keyspace) + { + this.storageService = storageService; + this.cmd = cmd; + this.options = options; + this.keyspace = keyspace; + } + + @Override + public void addProgressListener(ProgressListener listener) + { + listeners.add(listener); + } + + @Override + public void removeProgressListener(ProgressListener listener) + { + listeners.remove(listener); + } + + protected void fireProgressEvent(String tag, ProgressEvent event) + { + for (ProgressListener listener : listeners) + { + listener.progress(tag, event); + } + } + + protected void fireErrorAndComplete(String tag, int progressCount, int totalProgress, String message) + { + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progressCount, totalProgress, message)); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progressCount, totalProgress)); + } + + protected void runMayThrow() throws Exception + { + final TraceState traceState; + + final String tag = "repair:" + cmd; + + final AtomicInteger progress = new AtomicInteger(); + final int totalProgress = 3 + options.getRanges().size(); // calculate neighbors, validation, prepare for repair + number of ranges to repair + + String[] columnFamilies = options.getColumnFamilies().toArray(new String[options.getColumnFamilies().size()]); + Iterable<ColumnFamilyStore> validColumnFamilies = storageService.getValidColumnFamilies(false, false, keyspace, + columnFamilies); + + final long startTime = System.currentTimeMillis(); + String message = String.format("Starting repair command #%d, repairing keyspace %s with %s", cmd, keyspace, + options); + logger.info(message); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.START, 0, 100, message)); + if (options.isTraced()) + { + StringBuilder cfsb = new StringBuilder(); + for (ColumnFamilyStore cfs : validColumnFamilies) + cfsb.append(", ").append(cfs.keyspace.getName()).append(".").append(cfs.name); + + UUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR); + traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", keyspace, "columnFamilies", + cfsb.substring(2))); + Tracing.traceRepair(message); + traceState.enableActivityNotification(tag); + for (ProgressListener listener : listeners) + traceState.addProgressListener(listener); + Thread queryThread = createQueryThread(cmd, sessionId); + queryThread.setName("RepairTracePolling"); + queryThread.start(); + } + else + { + traceState = null; + } + + final Set<InetAddress> allNeighbors = new HashSet<>(); + Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>(); + try + { + for (Range<Token> range : options.getRanges()) + { + Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range, + options.getDataCenters(), + options.getHosts()); + rangeToNeighbors.put(range, neighbors); + allNeighbors.addAll(neighbors); + } + progress.incrementAndGet(); + } + catch (IllegalArgumentException e) + { + logger.error("Repair failed:", e); + fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage()); + return; + } + + // Validate columnfamilies + List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(); + try + { + Iterables.addAll(columnFamilyStores, validColumnFamilies); + progress.incrementAndGet(); + } + catch (IllegalArgumentException e) + { + fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage()); + return; + } + + String[] cfnames = new String[columnFamilyStores.size()]; + for (int i = 0; i < columnFamilyStores.size(); i++) + { + cfnames[i] = columnFamilyStores.get(i).name; + } + + final UUID parentSession = UUIDGen.getTimeUUID(); + SystemDistributedKeyspace.startParentRepair(parentSession, keyspace, cfnames, options.getRanges()); + long repairedAt; + try + { - ActiveRepairService.instance.prepareForRepair(parentSession, allNeighbors, options, columnFamilyStores); ++ ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddress(), allNeighbors, options, columnFamilyStores); + repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).getRepairedAt(); + progress.incrementAndGet(); + } + catch (Throwable t) + { + SystemDistributedKeyspace.failParentRepair(parentSession, t); + fireErrorAndComplete(tag, progress.get(), totalProgress, t.getMessage()); + return; + } + + // Set up RepairJob executor for this repair command. + final ListeningExecutorService executor = MoreExecutors.listeningDecorator(new JMXConfigurableThreadPoolExecutor(options.getJobThreads(), + Integer.MAX_VALUE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), + new NamedThreadFactory("Repair#" + cmd), + "internal")); + + List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size()); + for (Range<Token> range : options.getRanges()) + { + final RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession, + range, + keyspace, + options.getParallelism(), + rangeToNeighbors.get(range), + repairedAt, + executor, + cfnames); + if (session == null) + continue; + // After repair session completes, notify client its result + Futures.addCallback(session, new FutureCallback<RepairSessionResult>() + { + public void onSuccess(RepairSessionResult result) + { + /** + * If the success message below is modified, it must also be updated on + * {@link org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport} + * for backward-compatibility support. + */ + String message = String.format("Repair session %s for range %s finished", session.getId(), + session.getRange().toString()); + logger.info(message); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS, + progress.incrementAndGet(), + totalProgress, + message)); + } + + public void onFailure(Throwable t) + { + /** + * If the failure message below is modified, it must also be updated on + * {@link org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport} + * for backward-compatibility support. + */ + String message = String.format("Repair session %s for range %s failed with error %s", + session.getId(), session.getRange().toString(), t.getMessage()); + logger.error(message, t); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS, + progress.incrementAndGet(), + totalProgress, + message)); + } + }); + futures.add(session); + } + + // After all repair sessions completes(successful or not), + // run anticompaction if necessary and send finish notice back to client + final Collection<Range<Token>> successfulRanges = new ArrayList<>(); + final AtomicBoolean hasFailure = new AtomicBoolean(); + final ListenableFuture<List<RepairSessionResult>> allSessions = Futures.successfulAsList(futures); + ListenableFuture anticompactionResult = Futures.transform(allSessions, new AsyncFunction<List<RepairSessionResult>, Object>() + { + @SuppressWarnings("unchecked") + public ListenableFuture apply(List<RepairSessionResult> results) throws Exception + { + // filter out null(=failed) results and get successful ranges + for (RepairSessionResult sessionResult : results) + { + if (sessionResult != null) + { + successfulRanges.add(sessionResult.range); + } + else + { + hasFailure.compareAndSet(false, true); + } + } + return ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successfulRanges); + } + }); + Futures.addCallback(anticompactionResult, new FutureCallback<Object>() + { + public void onSuccess(Object result) + { + SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges); + if (hasFailure.get()) + { + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, + "Some repair failed")); + } + else + { + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress, + "Repair completed successfully")); + } + repairComplete(); + } + + public void onFailure(Throwable t) + { + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage())); + SystemDistributedKeyspace.failParentRepair(parentSession, t); + repairComplete(); + } + + private void repairComplete() + { + String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime, + true, true); + String message = String.format("Repair command #%d finished in %s", cmd, duration); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message)); + logger.info(message); + if (options.isTraced() && traceState != null) + { + for (ProgressListener listener : listeners) + traceState.removeProgressListener(listener); + // Because DebuggableThreadPoolExecutor#afterExecute and this callback + // run in a nondeterministic order (within the same thread), the + // TraceState may have been nulled out at this point. The TraceState + // should be traceState, so just set it without bothering to check if it + // actually was nulled out. + Tracing.instance.set(traceState); + Tracing.traceRepair(message); + Tracing.instance.stopSession(); + } + executor.shutdownNow(); + } + }); + } + + private Thread createQueryThread(final int cmd, final UUID sessionId) + { + return new Thread(new WrappedRunnable() + { + // Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces. + // Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts. + public void runMayThrow() throws Exception + { + TraceState state = Tracing.instance.get(sessionId); + if (state == null) + throw new Exception("no tracestate"); + + String format = "select event_id, source, activity from %s.%s where session_id = ? and event_id > ? and event_id < ?;"; + String query = String.format(format, TraceKeyspace.NAME, TraceKeyspace.EVENTS); + SelectStatement statement = (SelectStatement) QueryProcessor.parseStatement(query).prepare().statement; + + ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId); + InetAddress source = FBUtilities.getBroadcastAddress(); + + HashSet<UUID>[] seen = new HashSet[] { new HashSet<>(), new HashSet<>() }; + int si = 0; + UUID uuid; + + long tlast = System.currentTimeMillis(), tcur; + + TraceState.Status status; + long minWaitMillis = 125; + long maxWaitMillis = 1000 * 1024L; + long timeout = minWaitMillis; + boolean shouldDouble = false; + + while ((status = state.waitActivity(timeout)) != TraceState.Status.STOPPED) + { + if (status == TraceState.Status.IDLE) + { + timeout = shouldDouble ? Math.min(timeout * 2, maxWaitMillis) : timeout; + shouldDouble = !shouldDouble; + } + else + { + timeout = minWaitMillis; + shouldDouble = false; + } + ByteBuffer tminBytes = ByteBufferUtil.bytes(UUIDGen.minTimeUUID(tlast - 1000)); + ByteBuffer tmaxBytes = ByteBufferUtil.bytes(UUIDGen.maxTimeUUID(tcur = System.currentTimeMillis())); + QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(sessionIdBytes, + tminBytes, + tmaxBytes)); + ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options); + UntypedResultSet result = UntypedResultSet.create(rows.result); + + for (UntypedResultSet.Row r : result) + { + if (source.equals(r.getInetAddress("source"))) + continue; + if ((uuid = r.getUUID("event_id")).timestamp() > (tcur - 1000) * 10000) + seen[si].add(uuid); + if (seen[si == 0 ? 1 : 0].contains(uuid)) + continue; + String message = String.format("%s: %s", r.getInetAddress("source"), r.getString("activity")); + fireProgressEvent("repair:" + cmd, + new ProgressEvent(ProgressEventType.NOTIFICATION, 0, 0, message)); + } + tlast = tcur; + + si = si == 0 ? 1 : 0; + seen[si].clear(); + } + } + }); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/118bea59/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java index 1ea5aaf,f8975f9..5d010f9 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@@ -35,16 -33,21 +35,22 @@@ import com.google.common.util.concurren import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor; -import org.apache.cassandra.concurrent.NamedThreadFactory; + import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; + import org.apache.cassandra.gms.ApplicationState; + import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.IFailureDetector; + import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; + import org.apache.cassandra.gms.IFailureDetectionEventListener; + import org.apache.cassandra.gms.VersionedValue; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; @@@ -75,26 -78,31 +81,27 @@@ import org.apache.cassandra.utils.concu * The creation of a repair session is done through the submitRepairSession that * returns a future on the completion of that session. */ - public class ActiveRepairService + public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener { - private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class); - // singleton enforcement - public static final ActiveRepairService instance = new ActiveRepairService(); - - public static final long UNREPAIRED_SSTABLE = 0; - - private static final ThreadPoolExecutor executor; - private boolean registeredForEndpointChanges = false; - - static - { - executor = new JMXConfigurableThreadPoolExecutor(4, - 60, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), - new NamedThreadFactory("AntiEntropySessions"), - "internal"); - } - + /** + * @deprecated this statuses are from the previous JMX notification service, + * which will be deprecated on 4.0. For statuses of the new notification + * service, see {@link org.apache.cassandra.streaming.StreamEvent.ProgressEvent} + */ + @Deprecated public static enum Status { STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED } ++ private boolean registeredForEndpointChanges = false; + + public static CassandraVersion SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION = new CassandraVersion("2.2.1"); + + private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class); + // singleton enforcement + public static final ActiveRepairService instance = new ActiveRepairService(FailureDetector.instance, Gossiper.instance); + + public static final long UNREPAIRED_SSTABLE = 0; /** * A map of active coordinator session. @@@ -246,9 -252,10 +253,9 @@@ return neighbors; } - public synchronized UUID prepareForRepair(UUID parentRepairSession, Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores) - public synchronized UUID prepareForRepair(InetAddress coordinator, Set<InetAddress> endpoints, Collection<Range<Token>> ranges, List<ColumnFamilyStore> columnFamilyStores) ++ public synchronized UUID prepareForRepair(UUID parentRepairSession, InetAddress coordinator, Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores) { - registerParentRepairSession(parentRepairSession, columnFamilyStores, options.getRanges(), options.isIncremental(), options.isGlobal()); - UUID parentRepairSession = UUIDGen.getTimeUUID(); - registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, ranges); ++ registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), options.isGlobal()); final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size()); final AtomicBoolean status = new AtomicBoolean(true); final Set<String> failedNodes = Collections.synchronizedSet(new HashSet<String>()); @@@ -313,9 -317,36 +320,15 @@@ return parentRepairSession; } - public void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal) - public synchronized void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges) ++ public void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal) { - parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, isIncremental, isGlobal, System.currentTimeMillis())); + if (!registeredForEndpointChanges) + { + Gossiper.instance.register(this); + FailureDetector.instance.registerFailureDetectionEventListener(this); + registeredForEndpointChanges = true; + } - - cleanupOldParentRepairSessions(); - - parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, System.currentTimeMillis())); - } - - /** - * Cleans up old failed parent repair sessions - if it is 24h old, we remove it from the map - */ - private void cleanupOldParentRepairSessions() - { - long currentTime = System.currentTimeMillis(); - - Set<UUID> expired = new HashSet<>(); - for (Map.Entry<UUID, ParentRepairSession> entry : parentRepairSessions.entrySet()) - { - ParentRepairSession session = entry.getValue(); - if (session.failed && currentTime - session.repairedAt > TimeUnit.HOURS.toMillis(24)) - expired.add(entry.getKey()); - } - for (UUID remove : expired) - parentRepairSessions.remove(remove); ++ parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, isIncremental, isGlobal, System.currentTimeMillis())); } public Set<SSTableReader> currentlyRepairing(UUID cfId, UUID parentRepairSession) @@@ -353,7 -394,13 +366,13 @@@ public ParentRepairSession getParentRepairSession(UUID parentSessionId) { - return parentRepairSessions.get(parentSessionId); + ParentRepairSession session = parentRepairSessions.get(parentSessionId); + // this can happen if a node thinks that the coordinator was down, but that coordinator got back before noticing + // that it was down itself. - if (session != null && session.failed) ++ if (session == null) + throw new RuntimeException("Parent repair session with id = " + parentSessionId + " has failed."); + + return session; } public synchronized ParentRepairSession removeParentRepairSession(UUID parentSessionId) @@@ -432,17 -465,26 +451,19 @@@ public static class ParentRepairSession { - public final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>(); - public final Collection<Range<Token>> ranges; + private final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>(); + private final Collection<Range<Token>> ranges; public final Map<UUID, Set<String>> sstableMap = new HashMap<>(); - private final long repairedAt; - /** - * used as fail time if failed is true - */ + public final boolean isIncremental; + public final boolean isGlobal; + public final long repairedAt; + public final InetAddress coordinator; - /** - * Used to mark a repair as failed - if the coordinator thinks that the repair is still ongoing and sends a - * request, we need to fail the coordinator as well. - */ - public final boolean failed; - public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal, long repairedAt) - public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, long repairedAt, boolean failed) ++ public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal, long repairedAt) { + this.coordinator = coordinator; for (ColumnFamilyStore cfs : columnFamilyStores) { - this.columnFamilyStores.put(cfs.metadata.cfId, cfs); sstableMap.put(cfs.metadata.cfId, new HashSet<String>()); } @@@ -492,12 -540,10 +513,13 @@@ } } - public ParentRepairSession asFailed() + public long getRepairedAt() { - return new ParentRepairSession(coordinator, Collections.<ColumnFamilyStore>emptyList(), Collections.<Range<Token>>emptyList(), System.currentTimeMillis(), true); + if (isGlobal) + return repairedAt; + return ActiveRepairService.UNREPAIRED_SSTABLE; } ++ @Override public String toString() { @@@ -509,4 -555,61 +531,58 @@@ '}'; } } + + /* + If the coordinator node dies we should remove the parent repair session from the other nodes. + This uses the same notifications as we get in RepairSession + */ + public void onJoin(InetAddress endpoint, EndpointState epState) {} + public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {} + public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {} + public void onAlive(InetAddress endpoint, EndpointState state) {} + public void onDead(InetAddress endpoint, EndpointState state) {} + + public void onRemove(InetAddress endpoint) + { + convict(endpoint, Double.MAX_VALUE); + } + + public void onRestart(InetAddress endpoint, EndpointState state) + { + convict(endpoint, Double.MAX_VALUE); + } + + /** + * Something has happened to a remote node - if that node is a coordinator, we mark the parent repair session id as failed. + * + * The fail marker is kept in the map for 24h to make sure that if the coordinator does not agree + * that the repair failed, we need to fail the entire repair session + * + * @param ep endpoint to be convicted + * @param phi the value of phi with with ep was convicted + */ + public void convict(InetAddress ep, double phi) + { + // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost. + if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold() || parentRepairSessions.isEmpty()) + return; + + Set<UUID> toRemove = new HashSet<>(); + + for (Map.Entry<UUID, ParentRepairSession> repairSessionEntry : parentRepairSessions.entrySet()) + { + if (repairSessionEntry.getValue().coordinator.equals(ep)) + { + toRemove.add(repairSessionEntry.getKey()); + } + } + + if (!toRemove.isEmpty()) + { - logger.debug("Failing {} in parent repair sessions", toRemove); ++ logger.debug("Removing {} in parent repair sessions", toRemove); + for (UUID id : toRemove) - { - ParentRepairSession failed = parentRepairSessions.get(id); - parentRepairSessions.replace(id, failed, failed.asFailed()); - } ++ parentRepairSessions.remove(id); + } + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/118bea59/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index 0ec854a,6ec4c7b..8b9ca08 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@@ -196,10 -106,10 +196,10 @@@ public class LeveledCompactionStrategyT assertTrue(strategy.getSSTableCountPerLevel()[2] > 0); Range<Token> range = new Range<>(Util.token(""), Util.token("")); - int gcBefore = keyspace.getColumnFamilyStore(cfname).gcBefore(System.currentTimeMillis()); + int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(System.currentTimeMillis()); UUID parentRepSession = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range), false, true); - ActiveRepairService.instance.registerParentRepairSession(parentRepSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range)); - RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), ksname, cfname, range); ++ ActiveRepairService.instance.registerParentRepairSession(parentRepSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range), false, true); + RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, range); Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore); CompactionManager.instance.submitValidation(cfs, validator).get(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/118bea59/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java index e5c03b9,0000000..892ced1 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java @@@ -1,129 -1,0 +1,130 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.net.InetAddress; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.SimpleStrategy; +import org.apache.cassandra.service.ActiveRepairService; ++import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTree; + +import static org.junit.Assert.assertEquals; + +public class LocalSyncTaskTest extends SchemaLoader +{ + private static final IPartitioner partirioner = Murmur3Partitioner.instance; + public static final String KEYSPACE1 = "DifferencerTest"; + public static final String CF_STANDARD = "Standard1"; + + @BeforeClass + public static void defineSchema() throws Exception + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + SimpleStrategy.class, + KSMetaData.optsWithRF(1), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD)); + } + + /** + * When there is no difference between two, LocalSyncTask should return stats with 0 difference. + */ + @Test + public void testNoDifference() throws Throwable + { + final InetAddress ep1 = InetAddress.getByName("127.0.0.1"); + final InetAddress ep2 = InetAddress.getByName("127.0.0.1"); + + Range<Token> range = new Range<>(partirioner.getMinimumToken(), partirioner.getRandomToken()); + RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", range); + + MerkleTree tree1 = createInitialTree(desc); + MerkleTree tree2 = createInitialTree(desc); + + // difference the trees + // note: we reuse the same endpoint which is bogus in theory but fine here + TreeResponse r1 = new TreeResponse(ep1, tree1); + TreeResponse r2 = new TreeResponse(ep2, tree2); + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE); + task.run(); + + assertEquals(0, task.get().numberOfDifferences); + } + + @Test + public void testDifference() throws Throwable + { + Range<Token> range = new Range<>(partirioner.getMinimumToken(), partirioner.getRandomToken()); + UUID parentRepairSession = UUID.randomUUID(); + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1"); + - ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range), false, false); ++ ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range), false, false); + + RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", range); + + MerkleTree tree1 = createInitialTree(desc); + MerkleTree tree2 = createInitialTree(desc); + + // change a range in one of the trees + Token token = partirioner.midpoint(range.left, range.right); + tree1.invalidate(token); + MerkleTree.TreeRange changed = tree1.get(token); + changed.hash("non-empty hash!".getBytes()); + + Set<Range<Token>> interesting = new HashSet<>(); + interesting.add(changed); + + // difference the trees + // note: we reuse the same endpoint which is bogus in theory but fine here + TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), tree1); + TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), tree2); + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE); + task.run(); + + // ensure that the changed range was recorded + assertEquals("Wrong differing ranges", interesting.size(), task.getCurrentStat().numberOfDifferences); + } + + private MerkleTree createInitialTree(RepairJobDesc desc) + { + MerkleTree tree = new MerkleTree(partirioner, desc.range, MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15)); + tree.init(); + for (MerkleTree.TreeRange r : tree.invalids()) + { + r.ensureHashInitialised(); + } + return tree; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/118bea59/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java index b4066d7,26e5126..7793660 --- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java +++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java @@@ -232,7 -56,7 +232,7 @@@ public class ActiveRepairServiceTes Set<SSTableReader> original = store.getUnrepairedSSTables(); UUID prsId = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(prsId, Collections.singletonList(store), null, true, false); - ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null); ++ ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, false); ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId); //add all sstables to parent repair session