Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d572ab0a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d572ab0a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d572ab0a Branch: refs/heads/trunk Commit: d572ab0ac86b4e635c2dd83e56926cc6192f1c2a Parents: 9904c62 d6ffa4b Author: Marcus Eriksson <marc...@apache.org> Authored: Tue May 24 07:42:07 2016 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Tue May 24 07:42:07 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../db/compaction/CompactionManager.java | 2 +- .../cassandra/service/ActiveRepairService.java | 59 ++++++++++------- .../service/ActiveRepairServiceTest.java | 66 +++++++++++++++++++- 4 files changed, 102 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d572ab0a/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 608d6c9,af97cd1..69e8c5d --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,36 -1,7 +1,38 @@@ -2.2.7 - * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669) +3.0.7 + * Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705) + * Allow compaction strategies to disable early open (CASSANDRA-11754) + * Refactor Materialized View code (CASSANDRA-11475) + * Update Java Driver (CASSANDRA-11615) +Merged from 2.2: + * Add message dropped tasks to nodetool netstats (CASSANDRA-11855) * Add seconds to cqlsh tracing session duration (CASSANDRA-11753) - * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395) + * Prohibit Reversed Counter type as part of the PK (CASSANDRA-9395) ++Merged from 2.1: ++ * Avoid holding SSTableReaders for duration of incremental repair (CASSANDRA-11739) + + +3.0.6 + * Disallow creating view with a static column (CASSANDRA-11602) + * Reduce the amount of object allocations caused by the getFunctions methods (CASSANDRA-11593) + * Potential error replaying commitlog with smallint/tinyint/date/time types (CASSANDRA-11618) + * Fix queries with filtering on counter columns (CASSANDRA-11629) + * Improve tombstone printing in sstabledump (CASSANDRA-11655) + * Fix paging for range queries where all clustering columns are specified (CASSANDRA-11669) + * Don't require HEAP_NEW_SIZE to be set when using G1 (CASSANDRA-11600) + * Fix sstabledump not showing cells after tombstone marker (CASSANDRA-11654) + * Ignore all LocalStrategy keyspaces for streaming and other related + operations (CASSANDRA-11627) + * Ensure columnfilter covers indexed columns for thrift 2i queries (CASSANDRA-11523) + * Only open one sstable scanner per sstable (CASSANDRA-11412) + * Option to specify ProtocolVersion in cassandra-stress (CASSANDRA-11410) + * ArithmeticException in avgFunctionForDecimal (CASSANDRA-11485) + * LogAwareFileLister should only use OLD sstable files in current folder to determine disk consistency (CASSANDRA-11470) + * Notify indexers of expired rows during compaction (CASSANDRA-11329) + * Properly respond with ProtocolError when a v1/v2 native protocol + header is received (CASSANDRA-11464) + * Validate that num_tokens and initial_token are consistent with one another (CASSANDRA-10120) +Merged from 2.2: + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669) * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626) * Exit JVM if JMX server fails to startup (CASSANDRA-11540) * Produce a heap dump when exiting on OOM (CASSANDRA-9861) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d572ab0a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d572ab0a/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java index 6c75149,1ea5aaf..5aac886 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@@ -446,39 -448,50 +446,50 @@@ public class ActiveRepairServic } this.ranges = ranges; this.repairedAt = repairedAt; - this.isGlobal = isGlobal; this.isIncremental = isIncremental; + this.isGlobal = isGlobal; } - public void addSSTables(UUID cfId, Set<SSTableReader> sstables) - { - sstableMap.get(cfId).addAll(sstables); - } - @SuppressWarnings("resource") - public synchronized Refs<SSTableReader> getAndReferenceSSTables(UUID cfId) + public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefs(UUID cfId) { - Set<SSTableReader> sstables = sstableMap.get(cfId); - Iterator<SSTableReader> sstableIterator = sstables.iterator(); ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> references = ImmutableMap.builder(); - while (sstableIterator.hasNext()) + for (SSTableReader sstable : getActiveSSTables(cfId)) { - SSTableReader sstable = sstableIterator.next(); - if (!new File(sstable.descriptor.filenameFor(Component.DATA)).exists()) - { - sstableIterator.remove(); - } + Ref<SSTableReader> ref = sstable.tryRef(); + if (ref == null) + sstableMap.get(cfId).remove(sstable.getFilename()); else + references.put(sstable, ref); + } + return new Refs<>(references.build()); + } + + private Set<SSTableReader> getActiveSSTables(UUID cfId) + { + Set<String> repairedSSTables = sstableMap.get(cfId); + Set<SSTableReader> activeSSTables = new HashSet<>(); + Set<String> activeSSTableNames = new HashSet<>(); + for (SSTableReader sstable : columnFamilyStores.get(cfId).getSSTables()) + { + if (repairedSSTables.contains(sstable.getFilename())) { - Ref<SSTableReader> ref = sstable.tryRef(); - if (ref == null) - sstableIterator.remove(); - else - references.put(sstable, ref); + activeSSTables.add(sstable); + activeSSTableNames.add(sstable.getFilename()); } } - return new Refs<>(references.build()); + sstableMap.put(cfId, activeSSTableNames); + return activeSSTables; } + + public void addSSTables(UUID cfId, Collection<SSTableReader> sstables) + { + for (SSTableReader sstable : sstables) + { + sstableMap.get(cfId).add(sstable.getFilename()); + } + } + public long getRepairedAt() { if (isGlobal) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d572ab0a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java index a61a33e,b4066d7..bd761db --- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java +++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java @@@ -27,18 -28,27 +27,23 @@@ import org.junit.BeforeClass import org.junit.Test; import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.Util; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.Mutation; ++import org.apache.cassandra.db.RowUpdateBuilder; + import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; + import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.AbstractReplicationStrategy; -import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.locator.TokenMetadata; -import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.utils.FBUtilities; + import org.apache.cassandra.utils.concurrent.Refs; import static org.junit.Assert.assertEquals; + import static org.junit.Assert.assertFalse; public class ActiveRepairServiceTest { @@@ -57,9 -67,10 +62,9 @@@ { SchemaLoader.prepareServer(); SchemaLoader.createKeyspace(KEYSPACE5, - SimpleStrategy.class, - KSMetaData.optsWithRF(2), + KeyspaceParams.simple(2), SchemaLoader.standardCFMD(KEYSPACE5, CF_COUNTER), - SchemaLoader.standardCFMD(KEYSPACE5, CF_STANDRAD1)); + SchemaLoader.standardCFMD(KEYSPACE5, CF_STANDARD1)); } @Before @@@ -213,4 -224,66 +218,61 @@@ } return endpoints; } + + @Test + public void testGetActiveRepairedSSTableRefs() + { + ColumnFamilyStore store = prepareColumnFamilyStore(); - Set<SSTableReader> original = store.getUnrepairedSSTables(); ++ Set<SSTableReader> original = store.getLiveSSTables(); + + UUID prsId = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(prsId, Collections.singletonList(store), null, true, false); ++ ActiveRepairService.instance.registerParentRepairSession(prsId, Collections.singletonList(store), null, true, 0, false); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId); + + //add all sstables to parent repair session + prs.addSSTables(store.metadata.cfId, original); + + //retrieve all sstable references from parent repair sessions + Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId); + Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator()); + assertEquals(original, retrieved); + refs.release(); + + //remove 1 sstable from data data tracker + Set<SSTableReader> newLiveSet = new HashSet<>(original); + Iterator<SSTableReader> it = newLiveSet.iterator(); + final SSTableReader removed = it.next(); + it.remove(); - store.getTracker().dropSSTables(new Predicate<SSTableReader>() ++ store.getTracker().dropSSTables(new com.google.common.base.Predicate<SSTableReader>() + { + public boolean apply(SSTableReader reader) + { + return removed.equals(reader); + } + }, OperationType.COMPACTION, null); + + //retrieve sstable references from parent repair session again - removed sstable must not be present + refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId); + retrieved = Sets.newHashSet(refs.iterator()); + assertEquals(newLiveSet, retrieved); + assertFalse(retrieved.contains(removed)); + refs.release(); + } + + private ColumnFamilyStore prepareColumnFamilyStore() + { + Keyspace keyspace = Keyspace.open(KEYSPACE5); + ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF_STANDARD1); + store.disableAutoCompaction(); - long timestamp = System.currentTimeMillis(); - //create 10 sstables + for (int i = 0; i < 10; i++) + { - DecoratedKey key = Util.dk(Integer.toString(i)); - Mutation rm = new Mutation(KEYSPACE5, key.getKey()); - for (int j = 0; j < 10; j++) - rm.add("Standard1", Util.cellname(Integer.toString(j)), - ByteBufferUtil.EMPTY_BYTE_BUFFER, - timestamp, - 0); - rm.apply(); - store.forceBlockingFlush(); ++ new RowUpdateBuilder(store.metadata, System.currentTimeMillis(), Integer.toString(i)) ++ .clustering("c") ++ .add("val", "val") ++ .build() ++ .applyUnsafe(); + } ++ store.forceBlockingFlush(); + return store; + } }