This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new c343175 Add addition incremental repair visibility to nodetool repair_admin c343175 is described below commit c34317526fc6dbe559beb36cf44e24278656bdf2 Author: Blake Eggleston <bdeggles...@gmail.com> AuthorDate: Tue Aug 4 16:38:33 2020 -0700 Add addition incremental repair visibility to nodetool repair_admin Patch by Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-14939 --- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 48 +++ .../db/compaction/CompactionStrategyManager.java | 25 ++ .../db/compaction/PendingRepairManager.java | 70 ++++- src/java/org/apache/cassandra/dht/Range.java | 17 ++ .../repair/consistent/ConsistentSession.java | 5 + .../cassandra/repair/consistent/LocalSession.java | 6 +- .../cassandra/repair/consistent/LocalSessions.java | 145 ++++++++- .../cassandra/repair/consistent/RepairedState.java | 339 +++++++++++++++++++++ .../repair/consistent/admin/CleanupSummary.java | 143 +++++++++ .../repair/consistent/admin/PendingStat.java | 143 +++++++++ .../repair/consistent/admin/PendingStats.java | 104 +++++++ .../repair/consistent/admin/RepairStats.java | 187 ++++++++++++ .../repair/consistent/admin/SchemaArgsParser.java | 117 +++++++ .../cassandra/repair/messages/RepairOption.java | 48 +-- .../cassandra/service/ActiveRepairService.java | 81 ++++- .../service/ActiveRepairServiceMBean.java | 7 +- src/java/org/apache/cassandra/tools/NodeTool.java | 10 +- .../cassandra/tools/nodetool/RepairAdmin.java | 322 ++++++++++++++----- test/unit/org/apache/cassandra/dht/RangeTest.java | 17 ++ .../cassandra/repair/AbstractRepairTest.java | 5 + .../repair/consistent/LocalSessionTest.java | 22 +- .../repair/consistent/PendingRepairStatTest.java | 184 +++++++++++ .../repair/consistent/RepairStateTest.java | 168 ++++++++++ .../consistent/admin/SchemaArgsParserTest.java | 91 ++++++ 25 files changed, 2198 insertions(+), 107 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index b8ace5b..123efdf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-beta2 + * Add addition incremental repair visibility to nodetool repair_admin (CASSANDRA-14939) * Always access system properties and environment variables via the new CassandraRelevantProperties and CassandraRelevantEnv classes (CASSANDRA-15876) * Remove deprecated HintedHandOffManager (CASSANDRA-15939) * Prevent repair from overrunning compaction (CASSANDRA-15817) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 77a8cdd..824005b 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -75,6 +75,8 @@ import org.apache.cassandra.metrics.Sampler.Sample; import org.apache.cassandra.metrics.Sampler.SamplerType; import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.repair.TableRepairManager; +import org.apache.cassandra.repair.consistent.admin.CleanupSummary; +import org.apache.cassandra.repair.consistent.admin.PendingStat; import org.apache.cassandra.schema.*; import org.apache.cassandra.schema.CompactionParams.TombstoneOption; import org.apache.cassandra.service.CacheService; @@ -1555,6 +1557,52 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return data.getUncompacting(); } + public Map<UUID, PendingStat> getPendingRepairStats() + { + Map<UUID, PendingStat.Builder> builders = new HashMap<>(); + for (SSTableReader sstable : getLiveSSTables()) + { + UUID session = sstable.getPendingRepair(); + if (session == null) + continue; + + if (!builders.containsKey(session)) + builders.put(session, new PendingStat.Builder()); + + builders.get(session).addSSTable(sstable); + } + + Map<UUID, PendingStat> stats = new HashMap<>(); + for (Map.Entry<UUID, PendingStat.Builder> entry : builders.entrySet()) + { + stats.put(entry.getKey(), entry.getValue().build()); + } + return stats; + } + + /** + * promotes (or demotes) data attached to an incremental repair session that has either completed successfully, + * or failed + * + * @return session ids whose data could not be released + */ + public CleanupSummary releaseRepairData(Collection<UUID> sessions, boolean force) + { + if (force) + { + Predicate<SSTableReader> predicate = sst -> { + UUID session = sst.getPendingRepair(); + return session != null && sessions.contains(session); + }; + return runWithCompactionsDisabled(() -> compactionStrategyManager.releaseRepairData(sessions), + predicate, false, true, true); + } + else + { + return compactionStrategyManager.releaseRepairData(sessions); + } + } + public boolean isFilterFullyCoveredBy(ClusteringIndexFilter filter, DataLimits limits, CachedPartition cached, diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index 708faff..3a05e50 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -39,6 +39,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.primitives.Longs; + +import org.apache.cassandra.db.compaction.PendingRepairManager.CleanupTask; import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +71,7 @@ import org.apache.cassandra.notifications.SSTableDeletingNotification; import org.apache.cassandra.notifications.SSTableListChangedNotification; import org.apache.cassandra.notifications.SSTableMetadataChanged; import org.apache.cassandra.notifications.SSTableRepairStatusChanged; +import org.apache.cassandra.repair.consistent.admin.CleanupSummary; import org.apache.cassandra.schema.CompactionParams; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ActiveRepairService; @@ -1223,4 +1226,26 @@ public class CompactionStrategyManager implements INotificationConsumer if (isTransient != sstable.isTransient()) throw new IllegalStateException(String.format("Failed setting isTransient to %b on %s (isTransient is %b)", isTransient, sstable, sstable.isTransient())); } + + public CleanupSummary releaseRepairData(Collection<UUID> sessions) + { + List<CleanupTask> cleanupTasks = new ArrayList<>(); + readLock.lock(); + try + { + for (PendingRepairManager prm : Iterables.concat(pendingRepairs.getManagers(), transientRepairs.getManagers())) + cleanupTasks.add(prm.releaseSessionData(sessions)); + } + finally + { + readLock.unlock(); + } + + CleanupSummary summary = new CleanupSummary(cfs, Collections.emptySet(), Collections.emptySet()); + + for (CleanupTask task : cleanupTasks) + summary = CleanupSummary.add(summary, task.cleanup()); + + return summary; + } } diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java index 764a4dc..aefa40b 100644 --- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java +++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java @@ -46,6 +46,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.repair.consistent.admin.CleanupSummary; import org.apache.cassandra.schema.CompactionParams; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.Pair; @@ -263,12 +264,79 @@ class PendingRepairManager @SuppressWarnings("resource") private RepairFinishedCompactionTask getRepairFinishedCompactionTask(UUID sessionID) { - Set<SSTableReader> sstables = get(sessionID).getSSTables(); + Preconditions.checkState(canCleanup(sessionID)); + AbstractCompactionStrategy compactionStrategy = get(sessionID); + if (compactionStrategy == null) + return null; + Set<SSTableReader> sstables = compactionStrategy.getSSTables(); long repairedAt = ActiveRepairService.instance.consistent.local.getFinalSessionRepairedAt(sessionID); LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION); return txn == null ? null : new RepairFinishedCompactionTask(cfs, txn, sessionID, repairedAt); } + public static class CleanupTask + { + private final ColumnFamilyStore cfs; + private final List<Pair<UUID, RepairFinishedCompactionTask>> tasks; + + public CleanupTask(ColumnFamilyStore cfs, List<Pair<UUID, RepairFinishedCompactionTask>> tasks) + { + this.cfs = cfs; + this.tasks = tasks; + } + + public CleanupSummary cleanup() + { + Set<UUID> successful = new HashSet<>(); + Set<UUID> unsuccessful = new HashSet<>(); + for (Pair<UUID, RepairFinishedCompactionTask> pair : tasks) + { + UUID session = pair.left; + RepairFinishedCompactionTask task = pair.right; + + if (task != null) + { + try + { + task.run(); + successful.add(session); + } + catch (Throwable t) + { + t = task.transaction.abort(t); + logger.error("Failed cleaning up " + session, t); + unsuccessful.add(session); + } + } + else + { + unsuccessful.add(session); + } + } + return new CleanupSummary(cfs, successful, unsuccessful); + } + + public Throwable abort(Throwable accumulate) + { + for (Pair<UUID, RepairFinishedCompactionTask> pair : tasks) + accumulate = pair.right.transaction.abort(accumulate); + return accumulate; + } + } + + public CleanupTask releaseSessionData(Collection<UUID> sessionIDs) + { + List<Pair<UUID, RepairFinishedCompactionTask>> tasks = new ArrayList<>(sessionIDs.size()); + for (UUID session : sessionIDs) + { + if (hasDataForSession(session)) + { + tasks.add(Pair.create(session, getRepairFinishedCompactionTask(session))); + } + } + return new CleanupTask(cfs, tasks); + } + synchronized int getNumPendingRepairFinishedTasks() { int count = 0; diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java index 80c9ef1..5b2f3d9 100644 --- a/src/java/org/apache/cassandra/dht/Range.java +++ b/src/java/org/apache/cassandra/dht/Range.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.*; import java.util.function.Predicate; +import com.google.common.collect.Iterables; import org.apache.commons.lang3.ObjectUtils; import org.apache.cassandra.db.PartitionPosition; @@ -139,6 +140,11 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen return contains(that.left) || (!that.left.equals(that.right) && intersects(new Range<T>(that.left, that.right))); } + public static boolean intersects(Iterable<Range<Token>> l, Iterable<Range<Token>> r) + { + return Iterables.any(l, rng -> rng.intersects(r)); + } + @SafeVarargs public static <T extends RingPosition<T>> Set<Range<T>> rangeSet(Range<T> ... ranges) { @@ -335,6 +341,17 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen return result; } + + public static <T extends RingPosition<T>> Set<Range<T>> subtract(Collection<Range<T>> ranges, Collection<Range<T>> subtract) + { + Set<Range<T>> result = new HashSet<>(); + for (Range<T> range : ranges) + { + result.addAll(range.subtractAll(subtract)); + } + return result; + } + /** * Calculate set of the difference ranges of given two ranges * (as current (A, B] and rhs is (C, D]) diff --git a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java index d9ac927..e4d8ff0 100644 --- a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java +++ b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java @@ -216,6 +216,11 @@ public abstract class ConsistentSession this.state = state; } + public boolean intersects(Iterable<Range<Token>> otherRanges) + { + return Iterables.any(ranges, r -> r.intersects(otherRanges)); + } + public boolean equals(Object o) { if (this == o) return true; diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSession.java b/src/java/org/apache/cassandra/repair/consistent/LocalSession.java index e116a43..a6f81d7 100644 --- a/src/java/org/apache/cassandra/repair/consistent/LocalSession.java +++ b/src/java/org/apache/cassandra/repair/consistent/LocalSession.java @@ -98,14 +98,16 @@ public class LocalSession extends ConsistentSession private int startedAt; private int lastUpdate; - public void withStartedAt(int startedAt) + public Builder withStartedAt(int startedAt) { this.startedAt = startedAt; + return this; } - public void withLastUpdate(int lastUpdate) + public Builder withLastUpdate(int lastUpdate) { this.lastUpdate = lastUpdate; + return this; } void validate() diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java index a35c50a..55fe2f0 100644 --- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java +++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java @@ -41,10 +41,12 @@ import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -59,6 +61,9 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.repair.KeyspaceRepairManager; +import org.apache.cassandra.repair.consistent.admin.CleanupSummary; +import org.apache.cassandra.repair.consistent.admin.PendingStat; +import org.apache.cassandra.repair.consistent.admin.PendingStats; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.locator.InetAddressAndPort; @@ -148,6 +153,7 @@ public class LocalSessions private final String table = SystemKeyspace.REPAIRS; private boolean started = false; private volatile ImmutableMap<UUID, LocalSession> sessions = ImmutableMap.of(); + private volatile ImmutableMap<TableId, RepairedState> repairedStates = ImmutableMap.of(); @VisibleForTesting int getNumSessions() @@ -173,16 +179,137 @@ public class LocalSessions return StorageService.instance.isInitialized(); } - public List<Map<String, String>> sessionInfo(boolean all) + public List<Map<String, String>> sessionInfo(boolean all, Set<Range<Token>> ranges) { Iterable<LocalSession> currentSessions = sessions.values(); + if (!all) - { currentSessions = Iterables.filter(currentSessions, s -> !s.isCompleted()); - } + + if (!ranges.isEmpty()) + currentSessions = Iterables.filter(currentSessions, s -> s.intersects(ranges)); + return Lists.newArrayList(Iterables.transform(currentSessions, LocalSessionInfo::sessionToMap)); } + private RepairedState getRepairedState(TableId tid) + { + if (!repairedStates.containsKey(tid)) + { + synchronized (this) + { + if (!repairedStates.containsKey(tid)) + { + repairedStates = ImmutableMap.<TableId, RepairedState>builder() + .putAll(repairedStates) + .put(tid, new RepairedState()) + .build(); + } + } + } + return Verify.verifyNotNull(repairedStates.get(tid)); + } + + private void maybeUpdateRepairedState(LocalSession session) + { + if (session.getState() != FINALIZED) + return; + + // if the session is finalized but has repairedAt set to 0, it was + // a forced repair, and we shouldn't update the repaired state + if (session.repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE) + return; + + for (TableId tid : session.tableIds) + { + RepairedState state = getRepairedState(tid); + state.add(session.ranges, session.repairedAt); + } + } + + /** + * Determine if all ranges and tables covered by this session + * have since been re-repaired by a more recent session + */ + private boolean isSuperseded(LocalSession session) + { + for (TableId tid : session.tableIds) + { + RepairedState state = repairedStates.get(tid); + + if (state == null) + return false; + + long minRepaired = state.minRepairedAt(session.ranges); + if (minRepaired <= session.repairedAt) + return false; + } + + return true; + } + + public RepairedState.Stats getRepairedStats(TableId tid, Collection<Range<Token>> ranges) + { + RepairedState state = repairedStates.get(tid); + + if (state == null) + return RepairedState.Stats.EMPTY; + + return state.getRepairedStats(ranges); + } + + public PendingStats getPendingStats(TableId tid, Collection<Range<Token>> ranges) + { + ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(tid); + Preconditions.checkArgument(cfs != null); + + PendingStat.Builder pending = new PendingStat.Builder(); + PendingStat.Builder finalized = new PendingStat.Builder(); + PendingStat.Builder failed = new PendingStat.Builder(); + + Map<UUID, PendingStat> stats = cfs.getPendingRepairStats(); + for (Map.Entry<UUID, PendingStat> entry : stats.entrySet()) + { + UUID sessionID = entry.getKey(); + PendingStat stat = entry.getValue(); + Verify.verify(sessionID.equals(Iterables.getOnlyElement(stat.sessions))); + + LocalSession session = sessions.get(sessionID); + Verify.verifyNotNull(session); + + if (!Iterables.any(ranges, r -> r.intersects(session.ranges))) + continue; + + switch (session.getState()) + { + case FINALIZED: + finalized.addStat(stat); + break; + case FAILED: + failed.addStat(stat); + break; + default: + pending.addStat(stat); + } + } + + return new PendingStats(cfs.keyspace.getName(), cfs.name, pending.build(), finalized.build(), failed.build()); + } + + public CleanupSummary cleanup(TableId tid, Collection<Range<Token>> ranges, boolean force) + { + Iterable<LocalSession> candidates = Iterables.filter(sessions.values(), + ls -> ls.isCompleted() + && ls.tableIds.contains(tid) + && Range.intersects(ls.ranges, ranges)); + + ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(tid); + Set<UUID> sessionIds = Sets.newHashSet(Iterables.transform(candidates, s -> s.sessionID)); + + + return cfs.releaseRepairData(sessionIds, force); + } + /** * hook for operators to cancel sessions, cancelling from a non-coordinator is an error, unless * force is set to true. Messages are sent out to other participants, but we don't wait for a response @@ -219,6 +346,7 @@ public class LocalSessions try { LocalSession session = load(row); + maybeUpdateRepairedState(session); loadedSessions.put(session.sessionID, session); } catch (IllegalArgumentException | NullPointerException e) @@ -275,7 +403,14 @@ public class LocalSessions } else if (shouldDelete(session, now)) { - if (!sessionHasData(session)) + if (session.getState() == FINALIZED && !isSuperseded(session)) + { + // if we delete a non-superseded session, some ranges will be mis-reported as + // not having been repaired in repair_admin after a restart + logger.info("Skipping delete of FINALIZED LocalSession {} because it has " + + "not been superseded by a more recent session", session.sessionID); + } + else if (!sessionHasData(session)) { logger.info("Auto deleting repair session {}", session); deleteSession(session.sessionID); @@ -370,6 +505,8 @@ public class LocalSessions session.participants.stream().map(participant -> participant.toString()).collect(Collectors.toSet()), serializeRanges(session.ranges), tableIdToUuid(session.tableIds)); + + maybeUpdateRepairedState(session); } private static int dateToSeconds(Date d) diff --git a/src/java/org/apache/cassandra/repair/consistent/RepairedState.java b/src/java/org/apache/cassandra/repair/consistent/RepairedState.java new file mode 100644 index 0000000..ac0e7cb --- /dev/null +++ b/src/java/org/apache/cassandra/repair/consistent/RepairedState.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; +import com.google.common.collect.Sets; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.UUIDGen; + +import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE; + +/** + * Tracks the repaired state of token ranges per table, and is effectively an + * in memory representation of the on-disk local incremental repair state. + * + * The main purpose of this class is to provide metrics via nodetool repair_admin. To make sure those metrics + * are accurate, it also determines when a completed IR session can be deleted, which is explained in a bit + * more detail in LocalSessions#cleanup, by the call to isSuperseded. + */ +public class RepairedState +{ + static class Level + { + final List<Range<Token>> ranges; + final long repairedAt; + + private static final Comparator<Level> timeComparator = Comparator.comparingLong(l -> -l.repairedAt); + + Level(Collection<Range<Token>> ranges, long repairedAt) + { + this.ranges = Range.normalize(ranges); + this.repairedAt = repairedAt; + } + + Level subtract(Collection<Range<Token>> ranges) + { + if (ranges.isEmpty()) + return this; + + Set<Range<Token>> difference = Range.subtract(this.ranges, ranges); + if (difference.isEmpty()) + return null; + + return new Level(difference, repairedAt); + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Level level = (Level) o; + return repairedAt == level.repairedAt && + Objects.equals(ranges, level.ranges); + } + + public int hashCode() + { + return Objects.hash(ranges, repairedAt); + } + + @Override + public String toString() + { + return "Level{" + + "ranges=" + ranges + + ", repairedAt=" + repairedAt + + '}'; + } + } + + public static class Section + { + public final Range<Token> range; + public final long repairedAt; + private static final Comparator<Section> tokenComparator = (l, r) -> l.range.left.compareTo(r.range.left); + + Section(Range<Token> range, long repairedAt) + { + this.range = range; + this.repairedAt = repairedAt; + } + + Section makeSubsection(Range<Token> subrange) + { + Preconditions.checkArgument(range.contains(subrange)); + return new Section(subrange, repairedAt); + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Section section = (Section) o; + return repairedAt == section.repairedAt && + Objects.equals(range, section.range); + } + + public int hashCode() + { + + return Objects.hash(range, repairedAt); + } + + @Override + public String toString() + { + return "Section{" + + "range=" + range + + ", repairedAt=" + repairedAt + + '}'; + } + } + + public static class Stats + { + public static final Stats EMPTY = new Stats(UNREPAIRED_SSTABLE, UNREPAIRED_SSTABLE, Collections.emptyList()); + + public final long minRepaired; + public final long maxRepaired; + public final List<Section> sections; + + public Stats(long minRepaired, long maxRepaired, List<Section> sections) + { + this.minRepaired = minRepaired; + this.maxRepaired = maxRepaired; + this.sections = sections; + } + + + } + + static class State + { + final ImmutableList<Level> levels; + final ImmutableList<Range<Token>> covered; + final ImmutableList<Section> sections; + + State(List<Level> levels, List<Range<Token>> covered, List<Section> sections) + { + this.levels = ImmutableList.copyOf(levels); + this.covered = ImmutableList.copyOf(covered); + this.sections = ImmutableList.copyOf(sections); + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + State state = (State) o; + return Objects.equals(levels, state.levels) && + Objects.equals(covered, state.covered); + } + + public int hashCode() + { + return Objects.hash(levels, covered); + } + + @Override + public String toString() + { + return "State{" + + "levels=" + levels + + ", covered=" + covered + + '}'; + } + } + + private volatile State state = new State(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + + State state() + { + return state; + } + + private static List<Section> levelsToSections(List<Level> levels) + { + List<Section> sections = new ArrayList<>(); + for (Level level : levels) + { + for (Range<Token> range : level.ranges) + { + sections.add(new Section(range, level.repairedAt)); + } + } + sections.sort(Section.tokenComparator); + return sections; + } + + public synchronized void add(Collection<Range<Token>> ranges, long repairedAt) + { + Level newLevel = new Level(ranges, repairedAt); + + State lastState = state; + + List<Level> tmp = new ArrayList<>(lastState.levels.size() + 1); + tmp.addAll(lastState.levels); + tmp.add(newLevel); + tmp.sort(Level.timeComparator); + + List<Level> levels = new ArrayList<>(lastState.levels.size() + 1); + List<Range<Token>> covered = new ArrayList<>(); + + for (Level level : tmp) + { + Level subtracted = level.subtract(covered); + if (subtracted == null) + continue; + + levels.add(subtracted); + + covered.addAll(subtracted.ranges); + covered = Range.normalize(covered); + } + + List<Section> sections = new ArrayList<>(); + for (Level level : levels) + { + for (Range<Token> range : level.ranges) + { + sections.add(new Section(range, level.repairedAt)); + } + } + sections.sort(Section.tokenComparator); + + state = new State(levels, covered, sections); + } + + public long minRepairedAt(Collection<Range<Token>> ranges) + { + State current = state; + + Set<Range<Token>> remainingRanges = new HashSet<>(ranges); + long minTime = Long.MAX_VALUE; + for (Section section : current.sections) + { + if (section.range.intersects(remainingRanges)) + { + minTime = Math.min(minTime, section.repairedAt); + remainingRanges = Range.subtract(remainingRanges, Collections.singleton(section.range)); + } + + if (remainingRanges.isEmpty()) + break; + } + // if there are still ranges we don't have data for, part of the requested ranges is unrepaired + return remainingRanges.isEmpty() ? minTime : UNREPAIRED_SSTABLE; + } + + static List<Section> getRepairedStats(List<Section> sections, Collection<Range<Token>> ranges) + { + if (ranges.isEmpty()) + return Collections.emptyList(); + + Set<Range<Token>> remaining = Sets.newHashSet(Range.normalize(ranges)); + List<Section> results = new ArrayList<>(); + + for (Section section : sections) + { + if (remaining.isEmpty()) + break; + + Set<Range<Token>> sectionRanges = Range.rangeSet(section.range); + for (Range<Token> range : remaining) + { + if (sectionRanges.isEmpty()) + break; + + Set<Range<Token>> intersection = new HashSet<>(); + sectionRanges.forEach(r -> intersection.addAll(r.intersectionWith(range))); + + if (intersection.isEmpty()) + continue; + + intersection.forEach(r -> results.add(section.makeSubsection(r))); + sectionRanges = Range.subtract(sectionRanges, intersection); + } + + remaining = Range.subtract(remaining, Collections.singleton(section.range)); + } + + remaining.forEach(r -> results.add(new Section(r, UNREPAIRED_SSTABLE))); + results.sort(Section.tokenComparator); + + return results; + } + + public Stats getRepairedStats(Collection<Range<Token>> ranges) + { + List<Section> sections = getRepairedStats(state.sections, ranges); + + if (sections.isEmpty()) + return new Stats(UNREPAIRED_SSTABLE, UNREPAIRED_SSTABLE, Collections.emptyList()); + + long minTime = Long.MAX_VALUE; + long maxTime = Long.MIN_VALUE; + + for (Section section : sections) + { + minTime = Math.min(minTime, section.repairedAt); + maxTime = Math.max(maxTime, section.repairedAt); + } + + return new Stats(minTime, maxTime, sections); + } +} diff --git a/src/java/org/apache/cassandra/repair/consistent/admin/CleanupSummary.java b/src/java/org/apache/cassandra/repair/consistent/admin/CleanupSummary.java new file mode 100644 index 0000000..f984411 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/consistent/admin/CleanupSummary.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent.admin; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import javax.management.openmbean.ArrayType; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.Sets; + +import org.apache.cassandra.db.ColumnFamilyStore; + +public class CleanupSummary +{ + private static final String[] COMPOSITE_NAMES = new String[] { "keyspace", "table", "successful", "unsuccessful" }; + private static final OpenType<?>[] COMPOSITE_TYPES; + private static final CompositeType COMPOSITE_TYPE; + + static + { + try + { + COMPOSITE_TYPES = new OpenType[] { SimpleType.STRING, + SimpleType.STRING, + ArrayType.getArrayType(SimpleType.STRING), + ArrayType.getArrayType(SimpleType.STRING) }; + COMPOSITE_TYPE = new CompositeType(RepairStats.Section.class.getName(), "PendingStats", + COMPOSITE_NAMES, COMPOSITE_NAMES, COMPOSITE_TYPES); + } + catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + + public final String keyspace; + public final String table; + + public final Set<UUID> successful; + public final Set<UUID> unsuccessful; + + public CleanupSummary(String keyspace, String table, Set<UUID> successful, Set<UUID> unsuccessful) + { + this.keyspace = keyspace; + this.table = table; + this.successful = successful; + this.unsuccessful = unsuccessful; + } + + public CleanupSummary(ColumnFamilyStore cfs, Set<UUID> successful, Set<UUID> unsuccessful) + { + this(cfs.keyspace.getName(), cfs.name, successful, unsuccessful); + } + + public static CleanupSummary add(CleanupSummary l, CleanupSummary r) + { + Preconditions.checkArgument(l.keyspace.equals(r.keyspace)); + Preconditions.checkArgument(l.table.equals(r.table)); + + Set<UUID> unsuccessful = new HashSet<>(l.unsuccessful); + unsuccessful.addAll(r.unsuccessful); + + Set<UUID> successful = new HashSet<>(l.successful); + successful.addAll(r.successful); + successful.removeAll(unsuccessful); + + return new CleanupSummary(l.keyspace, l.table, successful, unsuccessful); + } + + private static String[] uuids2Strings(Set<UUID> uuids) + { + String[] strings = new String[uuids.size()]; + int idx = 0; + for (UUID uuid : uuids) + strings[idx++] = uuid.toString(); + return strings; + } + + private static Set<UUID> strings2Uuids(String[] strings) + { + Set<UUID> uuids = Sets.newHashSetWithExpectedSize(strings.length); + for (String string : strings) + uuids.add(UUID.fromString(string)); + + return uuids; + } + + public CompositeData toComposite() + { + Map<String, Object> values = new HashMap<>(); + values.put(COMPOSITE_NAMES[0], keyspace); + values.put(COMPOSITE_NAMES[1], table); + values.put(COMPOSITE_NAMES[2], uuids2Strings(successful)); + values.put(COMPOSITE_NAMES[3], uuids2Strings(unsuccessful)); + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, values); + } + catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + + public static CleanupSummary fromComposite(CompositeData cd) + { + Preconditions.checkArgument(cd.getCompositeType().equals(COMPOSITE_TYPE)); + Object[] values = cd.getAll(COMPOSITE_NAMES); + return new CleanupSummary((String) values[0], + (String) values[1], + strings2Uuids((String[]) values[2]), + strings2Uuids((String[]) values[3])); + + } +} diff --git a/src/java/org/apache/cassandra/repair/consistent/admin/PendingStat.java b/src/java/org/apache/cassandra/repair/consistent/admin/PendingStat.java new file mode 100644 index 0000000..0c4424e --- /dev/null +++ b/src/java/org/apache/cassandra/repair/consistent/admin/PendingStat.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent.admin; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import javax.management.openmbean.ArrayType; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; + +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.FileUtils; + +public class PendingStat +{ + private static final String[] COMPOSITE_NAMES = new String[] {"dataSize", "numSSTables", "sessions"}; + private static final OpenType<?>[] COMPOSITE_TYPES; + public static final CompositeType COMPOSITE_TYPE; + + static + { + try + { + COMPOSITE_TYPES = new OpenType[] { SimpleType.LONG, SimpleType.INTEGER, ArrayType.getArrayType(SimpleType.STRING) }; + COMPOSITE_TYPE = new CompositeType(PendingStat.class.getName(), + PendingStat.class.getSimpleName(), + COMPOSITE_NAMES, COMPOSITE_NAMES, COMPOSITE_TYPES); + } + catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + + + + public final long dataSize; + public final int numSSTables; + public final Set<UUID> sessions; + + public PendingStat(long dataSize, int numSSTables, Set<UUID> sessions) + { + this.dataSize = dataSize; + this.numSSTables = numSSTables; + this.sessions = Collections.unmodifiableSet(sessions); + } + + public String sizeString() + { + return String.format("%s (%s sstables / %s sessions)", FileUtils.stringifyFileSize(dataSize), numSSTables, sessions.size()); + } + + public CompositeData toComposite() + { + Map<String, Object> values = new HashMap<>(); + values.put(COMPOSITE_NAMES[0], dataSize); + values.put(COMPOSITE_NAMES[1], numSSTables); + String[] sessionIds = new String[sessions.size()]; + int idx = 0; + for (UUID session : sessions) + sessionIds[idx++] = session.toString(); + values.put(COMPOSITE_NAMES[2], sessionIds); + + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, values); + } + catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + + public static PendingStat fromComposite(CompositeData cd) + { + Preconditions.checkArgument(cd.getCompositeType().equals(COMPOSITE_TYPE)); + Object[] values = cd.getAll(COMPOSITE_NAMES); + Set<UUID> sessions = new HashSet<>(); + for (String session : (String[]) values[2]) + sessions.add(UUID.fromString(session)); + return new PendingStat((long) values[0], (int) values[1], sessions); + } + + public static class Builder + { + public long dataSize = 0; + public int numSSTables = 0; + public Set<UUID> sessions = new HashSet<>(); + + public Builder addSSTable(SSTableReader sstable) + { + UUID sessionID = sstable.getPendingRepair(); + if (sessionID == null) + return this; + dataSize += sstable.onDiskLength(); + sessions.add(sessionID); + numSSTables++; + return this; + } + + public Builder addStat(PendingStat stat) + { + dataSize += stat.dataSize; + numSSTables += stat.numSSTables; + sessions.addAll(stat.sessions); + return this; + } + + public PendingStat build() + { + return new PendingStat(dataSize, numSSTables, sessions); + } + } +} diff --git a/src/java/org/apache/cassandra/repair/consistent/admin/PendingStats.java b/src/java/org/apache/cassandra/repair/consistent/admin/PendingStats.java new file mode 100644 index 0000000..9dbe0df --- /dev/null +++ b/src/java/org/apache/cassandra/repair/consistent/admin/PendingStats.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent.admin; + +import java.util.HashMap; +import java.util.Map; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; + +public class PendingStats +{ + + private static final String[] COMPOSITE_NAMES = new String[] { "keyspace", "table", "total", "pending", "finalized", "failed" }; + private static final OpenType<?>[] COMPOSITE_TYPES; + private static final CompositeType COMPOSITE_TYPE; + + static + { + try + { + COMPOSITE_TYPES = new OpenType[] { SimpleType.STRING, + SimpleType.STRING, + PendingStat.COMPOSITE_TYPE, + PendingStat.COMPOSITE_TYPE, + PendingStat.COMPOSITE_TYPE}; + COMPOSITE_TYPE = new CompositeType(RepairStats.Section.class.getName(), "PendingStats", COMPOSITE_NAMES, COMPOSITE_NAMES, COMPOSITE_TYPES); + } + catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + + public final String keyspace; + public final String table; + public final PendingStat total; + public final PendingStat pending; + public final PendingStat finalized; + public final PendingStat failed; + + public PendingStats(String keyspace, String table, PendingStat pending, PendingStat finalized, PendingStat failed) + { + this.keyspace = keyspace; + this.table = table; + + this.total = new PendingStat.Builder().addStat(pending).addStat(finalized).addStat(failed).build(); + this.pending = pending; + this.finalized = finalized; + this.failed = failed; + } + + public CompositeData toComposite() + { + Map<String, Object> values = new HashMap<>(); + values.put(COMPOSITE_NAMES[0], keyspace); + values.put(COMPOSITE_NAMES[1], table); + values.put(COMPOSITE_NAMES[2], pending.toComposite()); + values.put(COMPOSITE_NAMES[3], finalized.toComposite()); + values.put(COMPOSITE_NAMES[4], failed.toComposite()); + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, values); + } + catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + + public static PendingStats fromComposite(CompositeData cd) + { + Preconditions.checkArgument(cd.getCompositeType().equals(COMPOSITE_TYPE)); + Object[] values = cd.getAll(COMPOSITE_NAMES); + return new PendingStats((String) values[0], + (String) values[1], + PendingStat.fromComposite((CompositeData) values[2]), + PendingStat.fromComposite((CompositeData) values[3]), + PendingStat.fromComposite((CompositeData) values[3])); + + } +} diff --git a/src/java/org/apache/cassandra/repair/consistent/admin/RepairStats.java b/src/java/org/apache/cassandra/repair/consistent/admin/RepairStats.java new file mode 100644 index 0000000..bbb4778 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/consistent/admin/RepairStats.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent.admin; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.management.openmbean.*; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; + +import org.apache.cassandra.repair.consistent.RepairedState; + +public class RepairStats +{ + public static class Section + { + + private static final String[] COMPOSITE_NAMES = new String[] { "start", "end", "repairedAt" }; + private static final OpenType<?>[] COMPOSITE_TYPES; + private static final CompositeType COMPOSITE_TYPE; + + static + { + try + { + COMPOSITE_TYPES = new OpenType[] { SimpleType.STRING, SimpleType.STRING, SimpleType.LONG }; + COMPOSITE_TYPE = new CompositeType(Section.class.getName(), "Section", COMPOSITE_NAMES, COMPOSITE_NAMES, COMPOSITE_TYPES); + } + catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + + public final String start; + public final String end; + public final long time; + + public Section(String start, String end, long time) + { + this.start = start; + this.end = end; + this.time = time; + } + + private CompositeData toComposite() + { + Map<String, Object> values = new HashMap<>(); + values.put(COMPOSITE_NAMES[0], start); + values.put(COMPOSITE_NAMES[1], end); + values.put(COMPOSITE_NAMES[2], time); + + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, values); + } + catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + + private static Section fromComposite(CompositeData cd) + { + Preconditions.checkArgument(cd.getCompositeType().equals(COMPOSITE_TYPE)); + Object[] values = cd.getAll(COMPOSITE_NAMES); + String start = (String) values[0]; + String end = (String) values[1]; + long time = (long) values[2]; + return new Section(start, end, time); + } + + @Override + public String toString() + { + return String.format("(%s,%s]=%s", start, end, time); + } + } + + private static final String[] COMPOSITE_NAMES = new String[] { "keyspace", "table", "minRepaired", "maxRepaired", "sections" }; + private static final OpenType<?>[] COMPOSITE_TYPES; + private static final CompositeType COMPOSITE_TYPE; + + static + { + try + { + COMPOSITE_TYPES = new OpenType[] { SimpleType.STRING, SimpleType.STRING, + SimpleType.LONG, SimpleType.LONG, + ArrayType.getArrayType(Section.COMPOSITE_TYPE)}; + COMPOSITE_TYPE = new CompositeType(RepairStats.class.getName(), RepairStats.class.getSimpleName(), + COMPOSITE_NAMES, COMPOSITE_NAMES, COMPOSITE_TYPES); + } + catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + + public final String keyspace; + public final String table; + public final long minRepaired; + public final long maxRepaired; + public final List<Section> sections; + + private RepairStats(String keyspace, String table, long minRepaired, long maxRepaired, List<Section> sections) + { + this.keyspace = keyspace; + this.table = table; + this.minRepaired = minRepaired; + this.maxRepaired = maxRepaired; + this.sections = sections; + } + + public static List<Section> convertSections(List<RepairedState.Section> from) + { + List<Section> to = new ArrayList<>(from.size()); + for (RepairedState.Section section : from) + { + to.add(new Section(section.range.left.toString(), section.range.right.toString(), section.repairedAt)); + } + return to; + } + + public static RepairStats fromRepairState(String keyspace, String table, RepairedState.Stats stats) + { + return new RepairStats(keyspace, table, stats.minRepaired, stats.maxRepaired, convertSections(stats.sections)); + } + + public CompositeData toComposite() + { + Map<String, Object> values = new HashMap<>(); + values.put(COMPOSITE_NAMES[0], keyspace); + values.put(COMPOSITE_NAMES[1], table); + values.put(COMPOSITE_NAMES[2], minRepaired); + values.put(COMPOSITE_NAMES[3], maxRepaired); + + CompositeData[] compositeSections = new CompositeData[sections.size()]; + for (int i=0; i<sections.size(); i++) + compositeSections[i] = sections.get(i).toComposite(); + + values.put(COMPOSITE_NAMES[4], compositeSections); + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, values); + } + catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + + public static RepairStats fromComposite(CompositeData cd) + { + Preconditions.checkArgument(cd.getCompositeType().equals(COMPOSITE_TYPE)); + Object[] values = cd.getAll(COMPOSITE_NAMES); + + String keyspace = (String) values[0]; + String table = (String) values[1]; + long minRepaired = (long) values[2]; + long maxRepaired = (long) values[3]; + CompositeData[] sectionData = (CompositeData[]) values[4]; + List<Section> sections = new ArrayList<>(sectionData.length); + for (CompositeData scd : sectionData) + sections.add(Section.fromComposite(scd)); + return new RepairStats(keyspace, table, minRepaired, maxRepaired, sections); + } +} diff --git a/src/java/org/apache/cassandra/repair/consistent/admin/SchemaArgsParser.java b/src/java/org/apache/cassandra/repair/consistent/admin/SchemaArgsParser.java new file mode 100644 index 0000000..67c0244 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/consistent/admin/SchemaArgsParser.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent.admin; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.utils.AbstractIterator; + +public class SchemaArgsParser implements Iterable<ColumnFamilyStore> +{ + + private final List<String> schemaArgs; + + private SchemaArgsParser(List<String> schemaArgs) + { + this.schemaArgs = schemaArgs; + } + + private static class TableIterator extends AbstractIterator<ColumnFamilyStore> + { + private final Iterator<ColumnFamilyStore> tables; + + public TableIterator(String ksName, List<String> tableNames) + { + Preconditions.checkArgument(Schema.instance.getKeyspaceMetadata(ksName) != null); + Keyspace keyspace = Keyspace.open(ksName); + + if (tableNames.isEmpty()) + { + tables = keyspace.getColumnFamilyStores().iterator(); + } + else + { + tables = Lists.newArrayList(Iterables.transform(tableNames, tn -> keyspace.getColumnFamilyStore(tn))).iterator(); + } + } + + @Override + protected ColumnFamilyStore computeNext() + { + return tables.hasNext() ? tables.next() : endOfData(); + } + } + + @Override + public Iterator<ColumnFamilyStore> iterator() + { + if (schemaArgs.isEmpty()) + { + // iterate over everything + Iterator<String> ksNames = Schema.instance.getNonLocalStrategyKeyspaces().iterator(); + + return new AbstractIterator<ColumnFamilyStore>() + { + TableIterator current = null; + protected ColumnFamilyStore computeNext() + { + for (;;) + { + if (current != null && current.hasNext()) + { + return current.next(); + } + + if (ksNames.hasNext()) + { + current = new TableIterator(ksNames.next(), Collections.emptyList()); + continue; + } + + return endOfData(); + } + } + }; + + } + else + { + return new TableIterator(schemaArgs.get(0), schemaArgs.subList(1, schemaArgs.size())); + } + } + + public static Iterable<ColumnFamilyStore> parse(List<String> schemaArgs) + { + return new SchemaArgsParser(schemaArgs); + } + + public static Iterable<ColumnFamilyStore> parse(String... schemaArgs) + { + return parse(Lists.newArrayList(schemaArgs)); + } +} diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java index adcd776..f7ed052 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java @@ -57,6 +57,30 @@ public class RepairOption private static final Logger logger = LoggerFactory.getLogger(RepairOption.class); + public static Set<Range<Token>> parseRanges(String rangesStr, IPartitioner partitioner) + { + if (rangesStr == null || rangesStr.isEmpty()) + return Collections.emptySet(); + + Set<Range<Token>> ranges = new HashSet<>(); + StringTokenizer tokenizer = new StringTokenizer(rangesStr, ","); + while (tokenizer.hasMoreTokens()) + { + String[] rangeStr = tokenizer.nextToken().split(":", 2); + if (rangeStr.length < 2) + { + continue; + } + Token parsedBeginToken = partitioner.getTokenFactory().fromString(rangeStr[0].trim()); + Token parsedEndToken = partitioner.getTokenFactory().fromString(rangeStr[1].trim()); + if (parsedBeginToken.equals(parsedEndToken)) + { + throw new IllegalArgumentException("Start and end tokens must be different."); + } + ranges.add(new Range<>(parsedBeginToken, parsedEndToken)); + } + return ranges; + } /** * Construct RepairOptions object from given map of Strings. * <p> @@ -165,28 +189,10 @@ public class RepairOption } catch (NumberFormatException ignore) {} } + // ranges - String rangesStr = options.get(RANGES_KEY); - Set<Range<Token>> ranges = new HashSet<>(); - if (rangesStr != null) - { - StringTokenizer tokenizer = new StringTokenizer(rangesStr, ","); - while (tokenizer.hasMoreTokens()) - { - String[] rangeStr = tokenizer.nextToken().split(":", 2); - if (rangeStr.length < 2) - { - continue; - } - Token parsedBeginToken = partitioner.getTokenFactory().fromString(rangeStr[0].trim()); - Token parsedEndToken = partitioner.getTokenFactory().fromString(rangeStr[1].trim()); - if (parsedBeginToken.equals(parsedEndToken)) - { - throw new IllegalArgumentException("Start and end tokens must be different."); - } - ranges.add(new Range<>(parsedBeginToken, parsedEndToken)); - } - } + Set<Range<Token>> ranges = parseRanges(options.get(RANGES_KEY), partitioner); + boolean asymmetricSyncing = Boolean.parseBoolean(options.get(OPTIMISE_STREAMS_KEY)); RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, force, previewKind, asymmetricSyncing); diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index e61bd38..3b13907 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -22,6 +22,7 @@ import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import javax.management.openmbean.CompositeData; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -31,7 +32,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; import com.google.common.util.concurrent.AbstractFuture; - import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -55,9 +55,9 @@ import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.gms.IFailureDetector; import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; import org.apache.cassandra.gms.IFailureDetectionEventListener; +import org.apache.cassandra.gms.IFailureDetector; import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.TokenMetadata; @@ -66,14 +66,23 @@ import org.apache.cassandra.net.RequestCallback; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.CommonRange; -import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.repair.RepairJobDesc; import org.apache.cassandra.repair.RepairParallelism; import org.apache.cassandra.repair.RepairSession; import org.apache.cassandra.repair.consistent.CoordinatorSessions; import org.apache.cassandra.repair.consistent.LocalSessions; -import org.apache.cassandra.repair.messages.*; +import org.apache.cassandra.repair.consistent.admin.CleanupSummary; +import org.apache.cassandra.repair.consistent.admin.PendingStats; +import org.apache.cassandra.repair.consistent.admin.RepairStats; +import org.apache.cassandra.repair.consistent.RepairedState; +import org.apache.cassandra.repair.consistent.admin.SchemaArgsParser; +import org.apache.cassandra.repair.messages.PrepareMessage; +import org.apache.cassandra.repair.messages.RepairMessage; +import org.apache.cassandra.repair.messages.RepairOption; +import org.apache.cassandra.repair.messages.SyncResponse; +import org.apache.cassandra.repair.messages.ValidationResponse; import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MBeanWrapper; import org.apache.cassandra.utils.Pair; @@ -210,9 +219,10 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai } @Override - public List<Map<String, String>> getSessions(boolean all) + public List<Map<String, String>> getSessions(boolean all, String rangesStr) { - return consistent.local.sessionInfo(all); + Set<Range<Token>> ranges = RepairOption.parseRanges(rangesStr, DatabaseDescriptor.getPartitioner()); + return consistent.local.sessionInfo(all, ranges); } @Override @@ -234,6 +244,65 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai return DatabaseDescriptor.getRepairSessionSpaceInMegabytes(); } + public List<CompositeData> getRepairStats(List<String> schemaArgs, String rangeString) + { + List<CompositeData> stats = new ArrayList<>(); + Collection<Range<Token>> userRanges = rangeString != null + ? RepairOption.parseRanges(rangeString, DatabaseDescriptor.getPartitioner()) + : null; + + for (ColumnFamilyStore cfs : SchemaArgsParser.parse(schemaArgs)) + { + String keyspace = cfs.keyspace.getName(); + Collection<Range<Token>> ranges = userRanges != null + ? userRanges + : StorageService.instance.getLocalReplicas(keyspace).ranges(); + RepairedState.Stats cfStats = consistent.local.getRepairedStats(cfs.metadata().id, ranges); + stats.add(RepairStats.fromRepairState(keyspace, cfs.name, cfStats).toComposite()); + } + + return stats; + } + + @Override + public List<CompositeData> getPendingStats(List<String> schemaArgs, String rangeString) + { + List<CompositeData> stats = new ArrayList<>(); + Collection<Range<Token>> userRanges = rangeString != null + ? RepairOption.parseRanges(rangeString, DatabaseDescriptor.getPartitioner()) + : null; + for (ColumnFamilyStore cfs : SchemaArgsParser.parse(schemaArgs)) + { + String keyspace = cfs.keyspace.getName(); + Collection<Range<Token>> ranges = userRanges != null + ? userRanges + : StorageService.instance.getLocalReplicas(keyspace).ranges(); + PendingStats cfStats = consistent.local.getPendingStats(cfs.metadata().id, ranges); + stats.add(cfStats.toComposite()); + } + + return stats; + } + + @Override + public List<CompositeData> cleanupPending(List<String> schemaArgs, String rangeString, boolean force) + { + List<CompositeData> stats = new ArrayList<>(); + Collection<Range<Token>> userRanges = rangeString != null + ? RepairOption.parseRanges(rangeString, DatabaseDescriptor.getPartitioner()) + : null; + for (ColumnFamilyStore cfs : SchemaArgsParser.parse(schemaArgs)) + { + String keyspace = cfs.keyspace.getName(); + Collection<Range<Token>> ranges = userRanges != null + ? userRanges + : StorageService.instance.getLocalReplicas(keyspace).ranges(); + CleanupSummary summary = consistent.local.cleanup(cfs.metadata().id, ranges, force); + stats.add(summary.toComposite()); + } + return stats; + } + /** * Requests repairs for the given keyspace and column families. * diff --git a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java index f4d6c48..8cffecc 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java @@ -20,12 +20,13 @@ package org.apache.cassandra.service; import java.util.List; import java.util.Map; +import javax.management.openmbean.CompositeData; public interface ActiveRepairServiceMBean { public static final String MBEAN_NAME = "org.apache.cassandra.db:type=RepairService"; - public List<Map<String, String>> getSessions(boolean all); + public List<Map<String, String>> getSessions(boolean all, String rangesStr); public void failSession(String session, boolean force); public void setRepairSessionSpaceInMegabytes(int sizeInMegabytes); @@ -36,4 +37,8 @@ public interface ActiveRepairServiceMBean public int getRepairPendingCompactionRejectThreshold(); public void setRepairPendingCompactionRejectThreshold(int value); + + public List<CompositeData> getRepairStats(List<String> schemaArgs, String rangeString); + public List<CompositeData> getPendingStats(List<String> schemaArgs, String rangeString); + public List<CompositeData> cleanupPending(List<String> schemaArgs, String rangeString, boolean force); } diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index bf5e5cc..69cd04c 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -153,7 +153,6 @@ public class NodeTool ReloadSeeds.class, ResetFullQueryLog.class, Repair.class, - RepairAdmin.class, ReplayBatchlog.class, SetCacheCapacity.class, SetConcurrency.class, @@ -224,6 +223,15 @@ public class NodeTool .withDefaultCommand(CassHelp.class) .withCommand(BootstrapResume.class); + builder.withGroup("repair_admin") + .withDescription("list and fail incremental repair sessions") + .withDefaultCommand(RepairAdmin.ListCmd.class) + .withCommand(RepairAdmin.ListCmd.class) + .withCommand(RepairAdmin.CancelCmd.class) + .withCommand(RepairAdmin.CleanupDataCmd.class) + .withCommand(RepairAdmin.SummarizePendingCmd.class) + .withCommand(RepairAdmin.SummarizeRepairedCmd.class); + Cli<Consumer<INodeProbeFactory>> parser = builder.build(); int status = 0; diff --git a/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java b/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java index ba3cf62..b66c32a 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java +++ b/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java @@ -22,13 +22,23 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import javax.management.openmbean.CompositeData; + import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; + +import io.airlift.airline.Arguments; import io.airlift.airline.Command; import io.airlift.airline.Option; import org.apache.cassandra.repair.consistent.LocalSessionInfo; +import org.apache.cassandra.repair.consistent.admin.CleanupSummary; +import org.apache.cassandra.repair.consistent.admin.PendingStat; +import org.apache.cassandra.repair.consistent.admin.PendingStats; +import org.apache.cassandra.repair.consistent.admin.RepairStats; import org.apache.cassandra.service.ActiveRepairServiceMBean; import org.apache.cassandra.tools.NodeProbe; import org.apache.cassandra.tools.NodeTool; @@ -37,113 +47,287 @@ import org.apache.cassandra.utils.FBUtilities; /** * Supports listing and failing incremental repair sessions */ -@Command(name = "repair_admin", description = "list and fail incremental repair sessions") -public class RepairAdmin extends NodeTool.NodeToolCmd +public abstract class RepairAdmin extends NodeTool.NodeToolCmd { - @Option(title = "list", name = {"-l", "--list"}, description = "list repair sessions (default behavior)") - private boolean list = false; - - @Option(title = "all", name = {"-a", "--all"}, description = "include completed and failed sessions") - private boolean all = false; + @Command(name = "list", description = "list repair sessions") + public static class ListCmd extends RepairAdmin + { + @Option(title = "all", name = {"-a", "--all"}, description = "include completed and failed sessions") + private boolean all; - @Option(title = "cancel", name = {"-x", "--cancel"}, description = "cancel an incremental repair session") - private String cancel = null; + @Option(title = "start_token", name = {"-st", "--start-token"}, description = "Use -st to specify a token at which the repair range starts") + private String startToken = StringUtils.EMPTY; - @Option(title = "force", name = {"-f", "--force"}, description = "cancel repair session from a node other than the repair coordinator." + - " Attempting to cancel FINALIZED or FAILED sessions is an error.") - private boolean force = false; + @Option(title = "end_token", name = {"-et", "--end-token"}, description = "Use -et to specify a token at which repair range ends") + private String endToken = StringUtils.EMPTY; - private static final List<String> header = Lists.newArrayList("id", - "state", - "last activity", - "coordinator", - "participants", - "participants_wp"); + protected void execute(NodeProbe probe) + { + List<Map<String, String>> sessions = probe.getRepairServiceProxy().getSessions(all, getRangeString(startToken, endToken)); + if (sessions.isEmpty()) + { + System.out.println("no sessions"); + } + else + { + List<List<String>> rows = new ArrayList<>(); + rows.add(Lists.newArrayList("id", + "state", + "last activity", + "coordinator", + "participants", + "participants_wp")); + int now = FBUtilities.nowInSeconds(); + for (Map<String, String> session : sessions) + { + int updated = Integer.parseInt(session.get(LocalSessionInfo.LAST_UPDATE)); + List<String> values = Lists.newArrayList(session.get(LocalSessionInfo.SESSION_ID), + session.get(LocalSessionInfo.STATE), + (now - updated) + " (s)", + session.get(LocalSessionInfo.COORDINATOR), + session.get(LocalSessionInfo.PARTICIPANTS), + session.get(LocalSessionInfo.PARTICIPANTS_WP)); + rows.add(values); + } - private List<String> sessionValues(Map<String, String> session, int now) - { - int updated = Integer.parseInt(session.get(LocalSessionInfo.LAST_UPDATE)); - return Lists.newArrayList(session.get(LocalSessionInfo.SESSION_ID), - session.get(LocalSessionInfo.STATE), - Integer.toString(now - updated) + " (s)", - session.get(LocalSessionInfo.COORDINATOR), - session.get(LocalSessionInfo.PARTICIPANTS), - session.get(LocalSessionInfo.PARTICIPANTS_WP)); + printTable(rows); + } + } } - - private void listSessions(ActiveRepairServiceMBean repairServiceProxy) + @Command(name = "summarize-pending", description = "report the amount of data marked pending repair for the given token " + + "range (or all replicated range if no tokens are provided") + public static class SummarizePendingCmd extends RepairAdmin { - Preconditions.checkArgument(cancel == null); - Preconditions.checkArgument(!force, "-f/--force only valid for session cancel"); - List<Map<String, String>> sessions = repairServiceProxy.getSessions(all); - if (sessions.isEmpty()) - { - System.out.println("no sessions"); + @Option(title = "verbose", name = {"-v", "--verbose"}, description = "print additional info ") + private boolean verbose; - } - else + @Option(title = "start_token", name = {"-st", "--start-token"}, description = "Use -st to specify a token at which the repair range starts") + private String startToken = StringUtils.EMPTY; + + @Option(title = "end_token", name = {"-et", "--end-token"}, description = "Use -et to specify a token at which repair range ends") + private String endToken = StringUtils.EMPTY; + + @Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables") + private List<String> schemaArgs = new ArrayList<>(); + + protected void execute(NodeProbe probe) { - List<List<String>> rows = new ArrayList<>(); - rows.add(header); - int now = FBUtilities.nowInSeconds(); - for (Map<String, String> session : sessions) + List<CompositeData> cds = probe.getRepairServiceProxy().getPendingStats(schemaArgs, getRangeString(startToken, endToken)); + List<PendingStats> stats = new ArrayList<>(cds.size()); + cds.forEach(cd -> stats.add(PendingStats.fromComposite(cd))); + + stats.sort((l, r) -> { + int cmp = l.keyspace.compareTo(r.keyspace); + if (cmp != 0) + return cmp; + + return l.table.compareTo(r.table); + }); + + List<String> header = Lists.newArrayList("keyspace", "table", "total"); + if (verbose) { - rows.add(sessionValues(session, now)); + header.addAll(Lists.newArrayList("pending", "finalized", "failed")); } - // get max col widths - int[] widths = new int[header.size()]; - for (List<String> row : rows) + List<List<String>> rows = new ArrayList<>(stats.size() + 1); + rows.add(header); + + for (PendingStats stat : stats) { - assert row.size() == widths.length; - for (int i = 0; i < widths.length; i++) + List<String> row = new ArrayList<>(header.size()); + + row.add(stat.keyspace); + row.add(stat.table); + row.add(stat.total.sizeString()); + if (verbose) { - widths[i] = Math.max(widths[i], row.get(i).length()); + row.add(stat.pending.sizeString()); + row.add(stat.finalized.sizeString()); + row.add(stat.failed.sizeString()); } + rows.add(row); } - List<String> fmts = new ArrayList<>(widths.length); - for (int i = 0; i < widths.length; i++) + printTable(rows); + } + } + + @Command(name = "summarize-repaired", description = "return the most recent repairedAt timestamp for the given token range " + + "(or all replicated ranges if no tokens are provided)") + public static class SummarizeRepairedCmd extends RepairAdmin + { + @Option(title = "verbose", name = {"-v", "--verbose"}, description = "print additional info ") + private boolean verbose = false; + + @Option(title = "start_token", name = {"-st", "--start-token"}, description = "Use -st to specify a token at which the repair range starts") + private String startToken = StringUtils.EMPTY; + + @Option(title = "end_token", name = {"-et", "--end-token"}, description = "Use -et to specify a token at which repair range ends") + private String endToken = StringUtils.EMPTY; + + @Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables") + private List<String> schemaArgs = new ArrayList<>(); + + protected void execute(NodeProbe probe) + { + List<CompositeData> compositeData = probe.getRepairServiceProxy().getRepairStats(schemaArgs, getRangeString(startToken, endToken)); + + if (compositeData.isEmpty()) { - fmts.add("%-" + Integer.toString(widths[i]) + "s"); + System.out.println("no stats"); + return; } + List<RepairStats> stats = new ArrayList<>(compositeData.size()); + compositeData.forEach(cd -> stats.add(RepairStats.fromComposite(cd))); + + stats.sort((l, r) -> { + int cmp = l.keyspace.compareTo(r.keyspace); + if (cmp != 0) + return cmp; + + return l.table.compareTo(r.table); + }); + + List<String> header = Lists.newArrayList("keyspace", "table", "min_repaired", "max_repaired"); + if (verbose) + header.add("detail"); - // print - for (List<String> row : rows) + List<List<String>> rows = new ArrayList<>(stats.size() + 1); + rows.add(header); + + for (RepairStats stat : stats) { - List<String> formatted = new ArrayList<>(row.size()); - for (int i = 0; i < widths.length; i++) + List<String> row = Lists.newArrayList(stat.keyspace, + stat.table, + Long.toString(stat.minRepaired), + Long.toString(stat.maxRepaired)); + if (verbose) { - formatted.add(String.format(fmts.get(i), row.get(i))); + row.add(Joiner.on(", ").join(Iterables.transform(stat.sections, RepairStats.Section::toString))); } - System.out.println(Joiner.on(" | ").join(formatted)); + rows.add(row); } + + printTable(rows); } } - private void cancelSession(ActiveRepairServiceMBean repairServiceProxy) + @Command(name = "cleanup", description = "cleans up pending data from completed sessions. " + + "This happens automatically, but the command is provided " + + "for situations where it needs to be expedited." + + " Use --force to cancel compactions that are preventing promotion") + public static class CleanupDataCmd extends RepairAdmin { - Preconditions.checkArgument(!list); - Preconditions.checkArgument(!all, "-a/--all only valid for session list"); - repairServiceProxy.failSession(cancel, force); + @Option(title = "force", name = {"-f", "--force"}, description = "Force a cleanup.") + private boolean force = false; + + @Option(title = "start_token", name = {"-st", "--start-token"}, description = "Use -st to specify a token at which the repair range starts") + private String startToken = StringUtils.EMPTY; + + @Option(title = "end_token", name = {"-et", "--end-token"}, description = "Use -et to specify a token at which repair range ends") + private String endToken = StringUtils.EMPTY; + + @Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables") + private List<String> schemaArgs = new ArrayList<>(); + + protected void execute(NodeProbe probe) + { + System.out.println("Cleaning up data from completed sessions..."); + List<CompositeData> compositeData = probe.getRepairServiceProxy().cleanupPending(schemaArgs, getRangeString(startToken, endToken), force); + + List<CleanupSummary> summaries = new ArrayList<>(compositeData.size()); + compositeData.forEach(cd -> summaries.add(CleanupSummary.fromComposite(cd))); + + summaries.sort((l, r) -> { + int cmp = l.keyspace.compareTo(r.keyspace); + if (cmp != 0) + return cmp; + + return l.table.compareTo(r.table); + }); + + List<String> header = Lists.newArrayList("keyspace", "table", "successful sessions", "unsuccessful sessions"); + List<List<String>> rows = new ArrayList<>(summaries.size() + 1); + rows.add(header); + + boolean hasFailures = false; + for (CleanupSummary summary : summaries) + { + List<String> row = Lists.newArrayList(summary.keyspace, + summary.table, + Integer.toString(summary.successful.size()), + Integer.toString(summary.unsuccessful.size())); + + hasFailures |= !summary.unsuccessful.isEmpty(); + rows.add(row); + } + + if (hasFailures) + System.out.println("Some tables couldn't be cleaned up completely"); + + printTable(rows); + } } - protected void execute(NodeProbe probe) + @Command(name = "cancel", description = "cancel an incremental repair session." + + " Use --force to cancel from a node other than the repair coordinator" + + " Attempting to cancel FINALIZED or FAILED sessions is an error.") + public static class CancelCmd extends RepairAdmin { - if (list && cancel != null) + @Option(title = "force", name = {"-f", "--force"}, description = "Force a cancellation.") + private boolean force = false; + + @Option(title = "session", name = {"-s", "--session"}, description = "The session to cancel", required = true) + private String sessionToCancel; + + protected void execute(NodeProbe probe) { - throw new RuntimeException("Can either list, or cancel sessions, not both"); + probe.getRepairServiceProxy().failSession(sessionToCancel, force); } - else if (cancel != null) + } + + private static void printTable(List<List<String>> rows) + { + if (rows.isEmpty()) + return; + + // get max col widths + int[] widths = new int[rows.get(0).size()]; + for (List<String> row : rows) { - cancelSession(probe.getRepairServiceProxy()); + assert row.size() == widths.length; + for (int i = 0; i < widths.length; i++) + { + widths[i] = Math.max(widths[i], row.get(i).length()); + } } - else + + List<String> fmts = new ArrayList<>(widths.length); + for (int i = 0; i < widths.length; i++) { - // default - listSessions(probe.getRepairServiceProxy()); + fmts.add("%-" + widths[i] + "s"); } + + // print + for (List<String> row : rows) + { + List<String> formatted = new ArrayList<>(row.size()); + for (int i = 0; i < widths.length; i++) + { + formatted.add(String.format(fmts.get(i), row.get(i))); + } + System.out.println(Joiner.on(" | ").join(formatted)); + } + } + + static String getRangeString(String startToken, String endToken) + { + String rangeStr = null; + if (!startToken.isEmpty() || !endToken.isEmpty()) + rangeStr = startToken + ':' + endToken; + return rangeStr; } } diff --git a/test/unit/org/apache/cassandra/dht/RangeTest.java b/test/unit/org/apache/cassandra/dht/RangeTest.java index 29b120b..68113f0 100644 --- a/test/unit/org/apache/cassandra/dht/RangeTest.java +++ b/test/unit/org/apache/cassandra/dht/RangeTest.java @@ -27,7 +27,9 @@ import java.util.Random; import java.util.Set; import com.google.common.base.Joiner; +import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -723,4 +725,19 @@ public class RangeTest Range<Token> r1 = r(20, -5); assertNotSame(r0.compareTo(r1), r1.compareTo(r0)); } + + @Test + public void testGroupIntersect() + { + assertTrue(Range.intersects(asList(r(1, 5), r(10, 15)), asList(r(4, 6), r(20, 25)))); + assertFalse(Range.intersects(asList(r(1, 5), r(10, 15)), asList(r(6, 7), r(20, 25)))); + } + + @Test + public void testGroupSubtract() + { + Collection<Range<Token>> ranges = Sets.newHashSet(r(1, 5), r(10, 15)); + assertEquals(ranges, Range.subtract(ranges, asList(r(6, 7), r(20, 25)))); + assertEquals(Sets.newHashSet(r(1, 4), r(11, 15)), Range.subtract(ranges, asList(r(4, 7), r(8, 11)))); + } } diff --git a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java index 2c47137..d57607a 100644 --- a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java +++ b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java @@ -71,6 +71,11 @@ public abstract class AbstractRepairTest return DatabaseDescriptor.getPartitioner().getToken(ByteBufferUtil.bytes(v)); } + protected static Range<Token> r(int l, int r) + { + return new Range<>(t(l), t(r)); + } + protected static final Range<Token> RANGE1 = new Range<>(t(1), t(2)); protected static final Range<Token> RANGE2 = new Range<>(t(2), t(3)); protected static final Range<Token> RANGE3 = new Range<>(t(4), t(5)); diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java index 15fd1fc..cb420b7 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java @@ -72,6 +72,8 @@ import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*; public class LocalSessionTest extends AbstractRepairTest { + private static final UUID TID1 = UUIDGen.getTimeUUID(); + private static final UUID TID2 = UUIDGen.getTimeUUID(); static LocalSession.Builder createBuilder() { @@ -79,7 +81,7 @@ public class LocalSessionTest extends AbstractRepairTest builder.withState(PREPARING); builder.withSessionID(UUIDGen.getTimeUUID()); builder.withCoordinator(COORDINATOR); - builder.withUUIDTableIds(Sets.newHashSet(UUIDGen.getTimeUUID(), UUIDGen.getTimeUUID())); + builder.withUUIDTableIds(Sets.newHashSet(TID1, TID2)); builder.withRepairedAt(System.currentTimeMillis()); builder.withRanges(Sets.newHashSet(RANGE1, RANGE2, RANGE3)); builder.withParticipants(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3)); @@ -870,6 +872,7 @@ public class LocalSessionTest extends AbstractRepairTest { LocalSession.Builder builder = createBuilder(); builder.withStartedAt(started); + builder.withRepairedAt(started); builder.withLastUpdate(updated); return builder.build(); } @@ -940,11 +943,26 @@ public class LocalSessionTest extends AbstractRepairTest sessions.cleanup(); + // failed session should be gone, but finalized should not, since it hasn't been superseded Assert.assertNull(sessions.getSession(failed.sessionID)); - Assert.assertNull(sessions.getSession(finalized.sessionID)); + Assert.assertNotNull(sessions.getSession(finalized.sessionID)); Assert.assertNull(sessions.loadUnsafe(failed.sessionID)); + Assert.assertNotNull(sessions.loadUnsafe(finalized.sessionID)); + + // add a finalized superseding session + LocalSession superseding = sessionWithTime(time, time + 1); + superseding.setState(FINALIZED); + sessions.putSessionUnsafe(superseding); + + sessions.cleanup(); + + // old finalized should be removed, superseding should still be there + Assert.assertNull(sessions.getSession(finalized.sessionID)); + Assert.assertNotNull(sessions.getSession(superseding.sessionID)); + Assert.assertNull(sessions.loadUnsafe(finalized.sessionID)); + Assert.assertNotNull(sessions.loadUnsafe(superseding.sessionID)); } /** diff --git a/test/unit/org/apache/cassandra/repair/consistent/PendingRepairStatTest.java b/test/unit/org/apache/cassandra/repair/consistent/PendingRepairStatTest.java new file mode 100644 index 0000000..6c42724 --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/consistent/PendingRepairStatTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent; + +import java.io.IOException; +import java.util.Collections; +import java.util.Set; +import java.util.UUID; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.repair.AbstractRepairTest; +import org.apache.cassandra.repair.consistent.LocalSessionTest.InstrumentedLocalSessions; +import org.apache.cassandra.repair.consistent.admin.PendingStats; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; + +import static org.apache.cassandra.repair.consistent.ConsistentSession.State.FAILED; +import static org.apache.cassandra.repair.consistent.ConsistentSession.State.FINALIZED; +import static org.apache.cassandra.repair.consistent.ConsistentSession.State.PREPARING; +import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR; +import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE; + +public class PendingRepairStatTest extends AbstractRepairTest +{ + private static TableMetadata cfm; + private static ColumnFamilyStore cfs; + + private static Range<Token> FULL_RANGE; + private static IPartitioner partitioner; + + static + { + DatabaseDescriptor.daemonInitialization(); + partitioner = DatabaseDescriptor.getPartitioner(); + assert partitioner instanceof ByteOrderedPartitioner; + FULL_RANGE = new Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(), + DatabaseDescriptor.getPartitioner().getMinimumToken()); + } + + @BeforeClass + public static void setupClass() + { + SchemaLoader.prepareServer(); + cfm = CreateTableStatement.parse("CREATE TABLE tbl (k INT PRIMARY KEY, v INT)", "coordinatorsessiontest").build(); + SchemaLoader.createKeyspace("coordinatorsessiontest", KeyspaceParams.simple(1), cfm); + cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id); + } + + @Before + public void setUp() throws Exception + { + cfs.enableAutoCompaction(); + } + + static LocalSession createSession() + { + LocalSession.Builder builder = LocalSession.builder(); + builder.withState(PREPARING); + builder.withSessionID(UUIDGen.getTimeUUID()); + builder.withCoordinator(COORDINATOR); + builder.withUUIDTableIds(Sets.newHashSet(cfm.id.asUUID())); + builder.withRepairedAt(System.currentTimeMillis()); + builder.withRanges(Collections.singleton(FULL_RANGE)); + builder.withParticipants(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3)); + + int now = FBUtilities.nowInSeconds(); + builder.withStartedAt(now); + builder.withLastUpdate(now); + + return builder.build(); + } + + private static SSTableReader createSSTable(int startKey, int keys) + { + Set<SSTableReader> existing = cfs.getLiveSSTables(); + assert keys > 0; + for (int i=0; i<keys; i++) + { + int key = startKey + i; + QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", cfm.keyspace, cfm.name), key, key); + } + cfs.forceBlockingFlush(); + return Iterables.getOnlyElement(Sets.difference(cfs.getLiveSSTables(), existing)); + } + + private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair) + { + try + { + cfs.getCompactionStrategyManager().mutateRepaired(Collections.singleton(sstable), repairedAt, pendingRepair, false); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + @Test + public void pendingRepairStats() + { + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + cfs.disableAutoCompaction(); + SSTableReader sstable1 = createSSTable(0, 10); + SSTableReader sstable2 = createSSTable(10, 10); + SSTableReader sstable3 = createSSTable(10, 20); + + LocalSession session1 = createSession(); + sessions.putSessionUnsafe(session1); + LocalSession session2 = createSession(); + sessions.putSessionUnsafe(session2); + + PendingStats stats; + stats = sessions.getPendingStats(cfm.id, Collections.singleton(FULL_RANGE)); + Assert.assertEquals(0, stats.total.numSSTables); + + // set all sstables to pending + mutateRepaired(sstable1, UNREPAIRED_SSTABLE, session1.sessionID); + mutateRepaired(sstable2, UNREPAIRED_SSTABLE, session2.sessionID); + mutateRepaired(sstable3, UNREPAIRED_SSTABLE, session2.sessionID); + + stats = sessions.getPendingStats(cfm.id, Collections.singleton(FULL_RANGE)); + Assert.assertEquals(Sets.newHashSet(session1.sessionID, session2.sessionID), stats.total.sessions); + Assert.assertEquals(3, stats.total.numSSTables); + Assert.assertEquals(3, stats.pending.numSSTables); + Assert.assertEquals(0, stats.failed.numSSTables); + Assert.assertEquals(0, stats.finalized.numSSTables); + + // set the 2 sessions to failed and finalized + session1.setState(FAILED); + sessions.save(session1); + session2.setState(FINALIZED); + sessions.save(session2); + + stats = sessions.getPendingStats(cfm.id, Collections.singleton(FULL_RANGE)); + Assert.assertEquals(3, stats.total.numSSTables); + Assert.assertEquals(0, stats.pending.numSSTables); + Assert.assertEquals(1, stats.failed.numSSTables); + Assert.assertEquals(2, stats.finalized.numSSTables); + + // remove sstables from pending sets + mutateRepaired(sstable1, UNREPAIRED_SSTABLE, NO_PENDING_REPAIR); + mutateRepaired(sstable2, session2.repairedAt, NO_PENDING_REPAIR); + mutateRepaired(sstable3, session2.repairedAt, NO_PENDING_REPAIR); + + stats = sessions.getPendingStats(cfm.id, Collections.singleton(FULL_RANGE)); + Assert.assertTrue(stats.total.sessions.isEmpty()); + } +} diff --git a/test/unit/org/apache/cassandra/repair/consistent/RepairStateTest.java b/test/unit/org/apache/cassandra/repair/consistent/RepairStateTest.java new file mode 100644 index 0000000..db425de --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/consistent/RepairStateTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + +import static org.apache.cassandra.repair.consistent.RepairedState.getRepairedStats; + +public class RepairStateTest +{ + private static Token tk(long t) + { + return new Murmur3Partitioner.LongToken(t); + } + + private static Range<Token> range(long left, long right) + { + return new Range<>(tk(left), tk(right)); + } + + private static List<Range<Token>> ranges(long... tokens) + { + assert tokens.length %2 == 0; + List<Range<Token>> ranges = new ArrayList<>(); + for (int i=0; i<tokens.length; i+=2) + { + ranges.add(range(tokens[i], tokens[i+1])); + + } + return ranges; + } + + private static RepairedState.Level level(Collection<Range<Token>> ranges, long repairedAt) + { + return new RepairedState.Level(ranges, repairedAt); + } + + private static RepairedState.Section sect(Range<Token> range, long repairedAt) + { + return new RepairedState.Section(range, repairedAt); + } + + private static RepairedState.Section sect(int l, int r, long time) + { + return sect(range(l, r), time); + } + + private static <T> List<T> l(T... contents) + { + return Lists.newArrayList(contents); + } + + @Test + public void mergeOverlapping() + { + RepairedState repairs = new RepairedState(); + + repairs.add(ranges(100, 300), 5); + repairs.add(ranges(200, 400), 6); + + RepairedState.State state = repairs.state(); + Assert.assertEquals(l(level(ranges(200, 400), 6), level(ranges(100, 200), 5)), state.levels); + Assert.assertEquals(l(sect(range(100, 200), 5), sect(range(200, 400), 6)), state.sections); + Assert.assertEquals(ranges(100, 400), state.covered); + } + + @Test + public void mergeSameRange() + { + RepairedState repairs = new RepairedState(); + + repairs.add(ranges(100, 400), 5); + repairs.add(ranges(100, 400), 6); + + RepairedState.State state = repairs.state(); + Assert.assertEquals(l(level(ranges(100, 400), 6)), state.levels); + Assert.assertEquals(l(sect(range(100, 400), 6)), state.sections); + Assert.assertEquals(ranges(100, 400), state.covered); + } + + @Test + public void mergeLargeRange() + { + RepairedState repairs = new RepairedState(); + + repairs.add(ranges(200, 300), 5); + repairs.add(ranges(100, 400), 6); + + RepairedState.State state = repairs.state(); + Assert.assertEquals(l(level(ranges(100, 400), 6)), state.levels); + Assert.assertEquals(l(sect(range(100, 400), 6)), state.sections); + Assert.assertEquals(ranges(100, 400), state.covered); + } + + @Test + public void mergeSmallRange() + { + RepairedState repairs = new RepairedState(); + + repairs.add(ranges(100, 400), 5); + repairs.add(ranges(200, 300), 6); + + RepairedState.State state = repairs.state(); + Assert.assertEquals(l(level(ranges(200, 300), 6), level(ranges(100, 200, 300, 400), 5)), state.levels); + Assert.assertEquals(l(sect(range(100, 200), 5), sect(range(200, 300), 6), sect(range(300, 400), 5)), state.sections); + Assert.assertEquals(ranges(100, 400), state.covered); + } + + + @Test + public void repairedAt() + { + RepairedState rs; + + // overlapping + rs = new RepairedState(); + rs.add(ranges(100, 300), 5); + rs.add(ranges(200, 400), 6); + + Assert.assertEquals(5, rs.minRepairedAt(ranges(150, 250))); + Assert.assertEquals(5, rs.minRepairedAt(ranges(150, 160))); + Assert.assertEquals(5, rs.minRepairedAt(ranges(100, 200))); + Assert.assertEquals(6, rs.minRepairedAt(ranges(200, 400))); + Assert.assertEquals(0, rs.minRepairedAt(ranges(200, 401))); + Assert.assertEquals(0, rs.minRepairedAt(ranges(99, 200))); + Assert.assertEquals(0, rs.minRepairedAt(ranges(50, 450))); + Assert.assertEquals(0, rs.minRepairedAt(ranges(50, 60))); + Assert.assertEquals(0, rs.minRepairedAt(ranges(450, 460))); + } + + @Test + public void stats() + { + Assert.assertEquals(l(sect(100, 200, 5), sect(200, 300, 0), sect(300, 400, 5)), + getRepairedStats(l(sect(100, 200, 5), sect(300, 400, 5)), ranges(100, 400))); + + Assert.assertEquals(l(sect(100, 200, 0), sect(200, 300, 5), sect(300, 400, 0)), + getRepairedStats(l(sect(200, 300, 5)), ranges(100, 400))); + + Assert.assertEquals(l(sect(200, 300, 5)), getRepairedStats(l(sect(200, 300, 5)), ranges(200, 300))); + } +} diff --git a/test/unit/org/apache/cassandra/repair/consistent/admin/SchemaArgsParserTest.java b/test/unit/org/apache/cassandra/repair/consistent/admin/SchemaArgsParserTest.java new file mode 100644 index 0000000..9d98c9d --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/consistent/admin/SchemaArgsParserTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.consistent.admin; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import com.google.common.collect.Sets; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.assertj.core.util.Lists; + +public class SchemaArgsParserTest +{ + private static final String KEYSPACE = "schemaargsparsertest"; + private static final int NUM_TBL = 3; + private static TableMetadata[] cfm = new TableMetadata[NUM_TBL]; + private static ColumnFamilyStore[] cfs = new ColumnFamilyStore[NUM_TBL]; + + @BeforeClass + public static void setupClass() + { + SchemaLoader.prepareServer(); + for (int i=0; i<NUM_TBL; i++) + cfm[i] = CreateTableStatement.parse("CREATE TABLE tbl" + i + " (k INT PRIMARY KEY, v INT)", KEYSPACE).build(); + SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), cfm); + for (int i=0; i<NUM_TBL; i++) + cfs[i] = Schema.instance.getColumnFamilyStoreInstance(cfm[i].id); + } + + /** + * Specifying only the keyspace should return all tables in that keyspaces + */ + @Test + public void keyspaceOnly() + { + Set<ColumnFamilyStore> tables = Sets.newHashSet(SchemaArgsParser.parse(KEYSPACE)); + Assert.assertEquals(Sets.newHashSet(cfs), tables); + } + + @Test + public void someTables() + { + Set<ColumnFamilyStore> tables = Sets.newHashSet(SchemaArgsParser.parse(KEYSPACE, "tbl1", "tbl2")); + Assert.assertEquals(Sets.newHashSet(cfs[1], cfs[2]), tables); + } + + @Test + public void noKeyspace() + { + Set<ColumnFamilyStore> allTables = Sets.newHashSet(SchemaArgsParser.parse().iterator()); + Assert.assertTrue(allTables.containsAll(Sets.newHashSet(cfs))); + } + + @Test( expected = IllegalArgumentException.class ) + public void invalidKeyspace() + { + Sets.newHashSet(SchemaArgsParser.parse("SOME_KEYSPACE")); + } + + @Test( expected = IllegalArgumentException.class ) + public void invalidTables() + { + Set<ColumnFamilyStore> tables = Sets.newHashSet(SchemaArgsParser.parse(KEYSPACE, "sometable")); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org