Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d6ffa4b7 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d6ffa4b7 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d6ffa4b7 Branch: refs/heads/cassandra-2.2 Commit: d6ffa4b7de9ec8b66239b2c75ff860ad6e3aa77a Parents: f463fed 675591d Author: Marcus Eriksson <[email protected]> Authored: Tue May 24 07:34:56 2016 +0200 Committer: Marcus Eriksson <[email protected]> Committed: Tue May 24 07:34:56 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 2 +- .../cassandra/service/ActiveRepairService.java | 59 ++++++++------- .../service/ActiveRepairServiceTest.java | 75 +++++++++++++++++++- 4 files changed, 110 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6ffa4b7/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 795e823,fcd7c3c..af97cd1 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,22 -1,5 +1,23 @@@ -2.1.15 +2.2.7 + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669) + * Add seconds to cqlsh tracing session duration (CASSANDRA-11753) + * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395) + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626) + * Exit JVM if JMX server fails to startup (CASSANDRA-11540) + * Produce a heap dump when exiting on OOM (CASSANDRA-9861) + * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427) + * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510) + * JSON datetime formatting needs timezone (CASSANDRA-11137) + * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502) + * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660) + * Add missing files to debian packages (CASSANDRA-11642) + * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621) + * cqlsh: COPY FROM should use regular inserts for single statement batches and + report errors correctly if workers processes crash on initialization (CASSANDRA-11474) + * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553) + * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988) +Merged from 2.1: + * Avoid holding SSTableReaders for duration of incremental repair (CASSANDRA-11739) * Add message dropped tasks to nodetool netstats (CASSANDRA-11855) * Don't compute expensive MaxPurgeableTimestamp until we've verified there's an expired tombstone (CASSANDRA-11834) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6ffa4b7/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6ffa4b7/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java index 21cdeae,5297ce3..1ea5aaf --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@@ -373,26 -378,13 +373,26 @@@ public class ActiveRepairServic { assert parentRepairSession != null; ParentRepairSession prs = getParentRepairSession(parentRepairSession); + //A repair will be marked as not global if it is a subrange repair to avoid many small anti-compactions + //in addition to other scenarios such as repairs not involving all DCs or hosts + if (!prs.isGlobal) + { + logger.info("Not a global repair, will not do anticompaction"); + removeParentRepairSession(parentRepairSession); + return Futures.immediateFuture(Collections.emptyList()); + } + assert prs.ranges.containsAll(successfulRanges) : "Trying to perform anticompaction on unknown ranges"; List<ListenableFuture<?>> futures = new ArrayList<>(); - for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet()) + // if we don't have successful repair ranges, then just skip anticompaction + if (!successfulRanges.isEmpty()) { - Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefs(columnFamilyStoreEntry.getKey()); - ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue(); - futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt)); + for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet()) + { - Refs<SSTableReader> sstables = prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey()); ++ Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefs(columnFamilyStoreEntry.getKey()); + ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue(); + futures.add(CompactionManager.instance.submitAntiCompaction(cfs, successfulRanges, sstables, prs.repairedAt)); + } } ListenableFuture<List<Object>> allAntiCompactionResults = Futures.successfulAsList(futures); @@@ -432,14 -424,12 +432,14 @@@ public static class ParentRepairSession { - public final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>(); - public final Collection<Range<Token>> ranges; + private final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>(); + private final Collection<Range<Token>> ranges; - private final Map<UUID, Set<SSTableReader>> sstableMap = new HashMap<>(); + public final Map<UUID, Set<String>> sstableMap = new HashMap<>(); - public final long repairedAt; + private final long repairedAt; + public final boolean isIncremental; + public final boolean isGlobal; - public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, long repairedAt) + public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal, long repairedAt) { for (ColumnFamilyStore cfs : columnFamilyStores) { @@@ -448,45 -438,48 +448,56 @@@ } this.ranges = ranges; this.repairedAt = repairedAt; + this.isGlobal = isGlobal; + this.isIncremental = isIncremental; } - public void addSSTables(UUID cfId, Set<SSTableReader> sstables) - { - sstableMap.get(cfId).addAll(sstables); - } - @SuppressWarnings("resource") - public synchronized Refs<SSTableReader> getAndReferenceSSTables(UUID cfId) + public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefs(UUID cfId) { - Set<SSTableReader> sstables = sstableMap.get(cfId); - Iterator<SSTableReader> sstableIterator = sstables.iterator(); ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> references = ImmutableMap.builder(); - while (sstableIterator.hasNext()) + for (SSTableReader sstable : getActiveSSTables(cfId)) { - SSTableReader sstable = sstableIterator.next(); - if (!new File(sstable.descriptor.filenameFor(Component.DATA)).exists()) - { - sstableIterator.remove(); - } + Ref<SSTableReader> ref = sstable.tryRef(); + if (ref == null) + sstableMap.get(cfId).remove(sstable.getFilename()); else + references.put(sstable, ref); + } + return new Refs<>(references.build()); + } + + private Set<SSTableReader> getActiveSSTables(UUID cfId) + { + Set<String> repairedSSTables = sstableMap.get(cfId); + Set<SSTableReader> activeSSTables = new HashSet<>(); + Set<String> activeSSTableNames = new HashSet<>(); + for (SSTableReader sstable : columnFamilyStores.get(cfId).getSSTables()) + { + if (repairedSSTables.contains(sstable.getFilename())) { - Ref<SSTableReader> ref = sstable.tryRef(); - if (ref == null) - sstableIterator.remove(); - else - references.put(sstable, ref); + activeSSTables.add(sstable); + activeSSTableNames.add(sstable.getFilename()); } } - return new Refs<>(references.build()); + sstableMap.put(cfId, activeSSTableNames); + return activeSSTables; } + + public void addSSTables(UUID cfId, Collection<SSTableReader> sstables) + { + for (SSTableReader sstable : sstables) + { + sstableMap.get(cfId).add(sstable.getFilename()); + } + } + + public long getRepairedAt() + { + if (isGlobal) + return repairedAt; + return ActiveRepairService.UNREPAIRED_SSTABLE; + } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6ffa4b7/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java index dab45f9,419ea1a..b4066d7 --- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java +++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java @@@ -1,218 -1,107 +1,289 @@@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package org.apache.cassandra.service; +import java.net.InetAddress; +import java.util.*; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; -import java.util.UUID; - ++import com.google.common.base.Predicate; import com.google.common.collect.Sets; - +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.SchemaLoader; + import org.apache.cassandra.Util; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.io.sstable.SSTableReader; ++import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.ConfigurationException; ++import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.locator.SimpleStrategy; +import org.apache.cassandra.locator.TokenMetadata; + import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + import org.apache.cassandra.utils.concurrent.Refs; import static org.junit.Assert.assertEquals; + import static org.junit.Assert.assertFalse; -public class ActiveRepairServiceTest extends SchemaLoader +public class ActiveRepairServiceTest { + public static final String KEYSPACE5 = "Keyspace5"; - public static final String CF_STANDRAD1 = "Standard1"; ++ public static final String CF_STANDARD1 = "Standard1"; + public static final String CF_COUNTER = "Counter1"; - private static final String KEYSPACE1 = "Keyspace1"; - private static final String CF = "Standard1"; + public String cfname; + public ColumnFamilyStore store; + public InetAddress LOCAL, REMOTE; + + private boolean initialized; + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE5, + SimpleStrategy.class, + KSMetaData.optsWithRF(2), + SchemaLoader.standardCFMD(KEYSPACE5, CF_COUNTER), - SchemaLoader.standardCFMD(KEYSPACE5, CF_STANDRAD1)); ++ SchemaLoader.standardCFMD(KEYSPACE5, CF_STANDARD1)); + } + + @Before + public void prepare() throws Exception + { + if (!initialized) + { + SchemaLoader.startGossiper(); + initialized = true; + + LOCAL = FBUtilities.getBroadcastAddress(); + // generate a fake endpoint for which we can spoof receiving/sending trees + REMOTE = InetAddress.getByName("127.0.0.2"); + } + + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + tmd.clearUnsafe(); + StorageService.instance.setTokens(Collections.singleton(StorageService.getPartitioner().getRandomToken())); + tmd.updateNormalToken(StorageService.getPartitioner().getMinimumToken(), REMOTE); + assert tmd.isMember(REMOTE); + } + + @Test + public void testGetNeighborsPlusOne() throws Throwable + { + // generate rf+1 nodes, and ensure that all nodes are returned + Set<InetAddress> expected = addTokens(1 + Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + expected.remove(FBUtilities.getBroadcastAddress()); + Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5); + Set<InetAddress> neighbors = new HashSet<>(); + for (Range<Token> range : ranges) + { + neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, null, null)); + } + assertEquals(expected, neighbors); + } + + @Test + public void testGetNeighborsTimesTwo() throws Throwable + { + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + + // generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned + addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy(); + Set<InetAddress> expected = new HashSet<>(); + for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress())) + { + expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange)); + } + expected.remove(FBUtilities.getBroadcastAddress()); + Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5); + Set<InetAddress> neighbors = new HashSet<>(); + for (Range<Token> range : ranges) + { + neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, null, null)); + } + assertEquals(expected, neighbors); + } + + @Test + public void testGetNeighborsPlusOneInLocalDC() throws Throwable + { + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + + // generate rf+1 nodes, and ensure that all nodes are returned + Set<InetAddress> expected = addTokens(1 + Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + expected.remove(FBUtilities.getBroadcastAddress()); + // remove remote endpoints + TokenMetadata.Topology topology = tmd.cloneOnlyTokenMap().getTopology(); + HashSet<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter())); + expected = Sets.intersection(expected, localEndpoints); + + Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5); + Set<InetAddress> neighbors = new HashSet<>(); + for (Range<Token> range : ranges) + { + neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null)); + } + assertEquals(expected, neighbors); + } + + @Test + public void testGetNeighborsTimesTwoInLocalDC() throws Throwable + { + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + + // generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned + addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy(); + Set<InetAddress> expected = new HashSet<>(); + for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress())) + { + expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange)); + } + expected.remove(FBUtilities.getBroadcastAddress()); + // remove remote endpoints + TokenMetadata.Topology topology = tmd.cloneOnlyTokenMap().getTopology(); + HashSet<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter())); + expected = Sets.intersection(expected, localEndpoints); + + Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5); + Set<InetAddress> neighbors = new HashSet<>(); + for (Range<Token> range : ranges) + { + neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null)); + } + assertEquals(expected, neighbors); + } + + @Test + public void testGetNeighborsTimesTwoInSpecifiedHosts() throws Throwable + { + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + + // generate rf*2 nodes, and ensure that only neighbors specified by the hosts are returned + addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy(); + List<InetAddress> expected = new ArrayList<>(); + for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress())) + { + expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange)); + } + + expected.remove(FBUtilities.getBroadcastAddress()); + Collection<String> hosts = Arrays.asList(FBUtilities.getBroadcastAddress().getCanonicalHostName(),expected.get(0).getCanonicalHostName()); + + assertEquals(expected.get(0), ActiveRepairService.getNeighbors(KEYSPACE5, + StorageService.instance.getLocalRanges(KEYSPACE5).iterator().next(), + null, hosts).iterator().next()); + } + + @Test(expected = IllegalArgumentException.class) + public void testGetNeighborsSpecifiedHostsWithNoLocalHost() throws Throwable + { + addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + //Dont give local endpoint + Collection<String> hosts = Arrays.asList("127.0.0.3"); + ActiveRepairService.getNeighbors(KEYSPACE5, StorageService.instance.getLocalRanges(KEYSPACE5).iterator().next(), null, hosts); + } + + Set<InetAddress> addTokens(int max) throws Throwable + { + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + Set<InetAddress> endpoints = new HashSet<>(); + for (int i = 1; i <= max; i++) + { + InetAddress endpoint = InetAddress.getByName("127.0.0." + i); + tmd.updateNormalToken(StorageService.getPartitioner().getRandomToken(), endpoint); + endpoints.add(endpoint); + } + return endpoints; + } + + @Test + public void testGetActiveRepairedSSTableRefs() + { + ColumnFamilyStore store = prepareColumnFamilyStore(); + Set<SSTableReader> original = store.getUnrepairedSSTables(); + + UUID prsId = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(prsId, Collections.singletonList(store), null); ++ ActiveRepairService.instance.registerParentRepairSession(prsId, Collections.singletonList(store), null, true, false); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId); + + //add all sstables to parent repair session + prs.addSSTables(store.metadata.cfId, original); + + //retrieve all sstable references from parent repair sessions + Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId); + Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator()); + assertEquals(original, retrieved); + refs.release(); + + //remove 1 sstable from data data tracker + Set<SSTableReader> newLiveSet = new HashSet<>(original); + Iterator<SSTableReader> it = newLiveSet.iterator(); - SSTableReader removed = it.next(); ++ final SSTableReader removed = it.next(); + it.remove(); - store.getDataTracker().replaceWithNewInstances(Collections.singleton(removed), Collections.EMPTY_SET); ++ store.getTracker().dropSSTables(new Predicate<SSTableReader>() ++ { ++ public boolean apply(SSTableReader reader) ++ { ++ return removed.equals(reader); ++ } ++ }, OperationType.COMPACTION, null); + + //retrieve sstable references from parent repair session again - removed sstable must not be present + refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId); + retrieved = Sets.newHashSet(refs.iterator()); + assertEquals(newLiveSet, retrieved); + assertFalse(retrieved.contains(removed)); + refs.release(); + } + + private ColumnFamilyStore prepareColumnFamilyStore() + { - Keyspace keyspace = Keyspace.open(KEYSPACE1); - ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF); - store.truncateBlocking(); ++ Keyspace keyspace = Keyspace.open(KEYSPACE5); ++ ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF_STANDARD1); + store.disableAutoCompaction(); + long timestamp = System.currentTimeMillis(); + //create 10 sstables + for (int i = 0; i < 10; i++) + { + DecoratedKey key = Util.dk(Integer.toString(i)); - Mutation rm = new Mutation(KEYSPACE1, key.getKey()); ++ Mutation rm = new Mutation(KEYSPACE5, key.getKey()); + for (int j = 0; j < 10; j++) + rm.add("Standard1", Util.cellname(Integer.toString(j)), + ByteBufferUtil.EMPTY_BYTE_BUFFER, + timestamp, + 0); + rm.apply(); + store.forceBlockingFlush(); + } + return store; + } }
