Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5d0d30e4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5d0d30e4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5d0d30e4 Branch: refs/heads/cassandra-2.2 Commit: 5d0d30e4d3d587a155d42eefe23f8586cdf06d94 Parents: 225232a 9358e58 Author: Marcus Eriksson <[email protected]> Authored: Thu Jun 23 15:10:35 2016 +0200 Committer: Marcus Eriksson <[email protected]> Committed: Thu Jun 23 15:10:35 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/service/ActiveRepairService.java | 12 +++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d0d30e4/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index b366d21,620568d..8fbdf3b --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,35 -1,5 +1,36 @@@ -2.1.15 +2.2.7 + * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755) + * Validate bloom_filter_fp_chance against lowest supported + value when the table is created (CASSANDRA-11920) + * RandomAccessReader: call isEOF() only when rebuffering, not for every read operation (CASSANDRA-12013) + * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038) + * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984) + * Persist local metadata earlier in startup sequence (CASSANDRA-11742) + * Run CommitLog tests with different compression settings (CASSANDRA-9039) + * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664) + * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587) + * Fix possible race condition in CommitLog.recover (CASSANDRA-11743) + * Enable client encryption in sstableloader with cli options (CASSANDRA-11708) + * Possible memory leak in NIODataInputStream (CASSANDRA-11867) + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669) + * Add seconds to cqlsh tracing session duration (CASSANDRA-11753) + * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395) + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626) + * Exit JVM if JMX server fails to startup (CASSANDRA-11540) + * Produce a heap dump when exiting on OOM (CASSANDRA-9861) + * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427) + * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510) + * JSON datetime formatting needs timezone (CASSANDRA-11137) + * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502) + * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660) + * Add missing files to debian packages (CASSANDRA-11642) + * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621) + * cqlsh: COPY FROM should use regular inserts for single statement batches and + report errors correctly if workers processes crash on initialization (CASSANDRA-11474) + * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553) + * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988) +Merged from 2.1: + * Don't try to get sstables for non-repairing column families (CASSANDRA-12077) * Prevent select statements with clustering key > 64k (CASSANDRA-11882) * Avoid marking too many sstables as repaired (CASSANDRA-11696) * Fix clock skew corrupting other nodes with paxos (CASSANDRA-11991) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d0d30e4/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java index e111155,4ca1e42..38804b3 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@@ -542,12 -524,14 +542,17 @@@ public class ActiveRepairService implem * @return */ @SuppressWarnings("resource") - public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefsForAntiCompaction(UUID cfId) + public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefsForAntiCompaction(UUID cfId, UUID parentSessionId) { assert marked.contains(cfId); - ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> references = ImmutableMap.builder(); - Iterable<SSTableReader> sstables = getActiveSSTables(cfId); - if (sstables == null) ++ if (!columnFamilyStores.containsKey(cfId)) + throw new RuntimeException("Not possible to get sstables for anticompaction for " + cfId); + boolean isSnapshotRepair = columnFamilyStores.get(cfId).snapshotExists(parentSessionId.toString()); + ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> references = ImmutableMap.builder(); - for (SSTableReader sstable : isSnapshotRepair ? getSSTablesForSnapshotRepair(cfId, parentSessionId) : getActiveSSTables(cfId)) ++ Iterable<SSTableReader> sstables = isSnapshotRepair ? getSSTablesForSnapshotRepair(cfId, parentSessionId) : getActiveSSTables(cfId); ++ // we check this above - if columnFamilyStores contains the cfId sstables will not be null ++ assert sstables != null; + for (SSTableReader sstable : sstables) { Ref<SSTableReader> ref = sstable.tryRef(); if (ref == null) @@@ -559,57 -543,21 +564,59 @@@ } /** - * Marks all the unrepaired sstables as repairing unless we have already done so. + * If we are running a snapshot repair we need to find the 'real' sstables when we start anticompaction * - * Any of these sstables that are still on disk are then anticompacted once the streaming and validation phases are done. + * We use the generation of the sstables as identifiers instead of the file name to avoid having to parse out the + * actual filename. * * @param cfId - * @param parentSessionId used to check that we don't start multiple inc repair sessions over the same sstables + * @param parentSessionId + * @return */ - public synchronized void markSSTablesRepairing(UUID cfId, UUID parentSessionId) + private Set<SSTableReader> getSSTablesForSnapshotRepair(UUID cfId, UUID parentSessionId) { - if (!marked.contains(cfId)) + Set<SSTableReader> activeSSTables = new HashSet<>(); + ColumnFamilyStore cfs = columnFamilyStores.get(cfId); ++ if (cfs == null) ++ return null; + + Set<Integer> snapshotGenerations = new HashSet<>(); + try (Refs<SSTableReader> snapshottedSSTables = cfs.getSnapshotSSTableReader(parentSessionId.toString())) { - List<SSTableReader> sstables = columnFamilyStores.get(cfId).select(ColumnFamilyStore.UNREPAIRED_SSTABLES).sstables; - Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId); - if (!Sets.intersection(currentlyRepairing, Sets.newHashSet(sstables)).isEmpty()) + for (SSTableReader sstable : snapshottedSSTables) + { + snapshotGenerations.add(sstable.descriptor.generation); + } + } + catch (IOException e) + { + throw new RuntimeException(e); + } + for (SSTableReader sstable : cfs.select(ColumnFamilyStore.CANONICAL_SSTABLES).sstables) + if (snapshotGenerations.contains(sstable.descriptor.generation)) + activeSSTables.add(sstable); + return activeSSTables; + } + + public synchronized void maybeSnapshot(UUID cfId, UUID parentSessionId) + { + String snapshotName = parentSessionId.toString(); + if (!columnFamilyStores.get(cfId).snapshotExists(snapshotName)) + { + Set<SSTableReader> snapshottedSSTables = columnFamilyStores.get(cfId).snapshot(snapshotName, new Predicate<SSTableReader>() { + public boolean apply(SSTableReader sstable) + { + return sstable != null && + (!isIncremental || !sstable.isRepaired()) && + !(sstable.partitioner instanceof LocalPartitioner) && // exclude SSTables from 2i + new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges); + } + }, true); + + if (isAlreadyRepairing(cfId, parentSessionId, snapshottedSSTables)) + { + columnFamilyStores.get(cfId).clearSnapshot(parentSessionId.toString()); logger.error("Cannot start multiple repair sessions over the same sstables"); throw new RuntimeException("Cannot start multiple repair sessions over the same sstables"); } @@@ -618,32 -566,13 +625,35 @@@ } } + + /** + * Compares other repairing sstables *generation* to the ones we just snapshotted + * + * we compare generations since the sstables have different paths due to snapshot names + * + * @param cfId id of the column family store + * @param parentSessionId parent repair session + * @param sstables the newly snapshotted sstables + * @return + */ + private boolean isAlreadyRepairing(UUID cfId, UUID parentSessionId, Collection<SSTableReader> sstables) + { + Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId); + Set<Integer> currentlyRepairingGenerations = new HashSet<>(); + Set<Integer> newRepairingGenerations = new HashSet<>(); + for (SSTableReader sstable : currentlyRepairing) + currentlyRepairingGenerations.add(sstable.descriptor.generation); + for (SSTableReader sstable : sstables) + newRepairingGenerations.add(sstable.descriptor.generation); + + return !Sets.intersection(currentlyRepairingGenerations, newRepairingGenerations).isEmpty(); + } + private Set<SSTableReader> getActiveSSTables(UUID cfId) { - if (failed) - return Collections.emptySet(); + if (!columnFamilyStores.containsKey(cfId)) + return null; + Set<String> repairedSSTables = sstableMap.get(cfId); Set<SSTableReader> activeSSTables = new HashSet<>(); Set<String> activeSSTableNames = new HashSet<>();
