Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 129b1193e -> 675591d1c refs/heads/cassandra-2.2 f463fed23 -> d6ffa4b7d refs/heads/cassandra-3.0 9904c628a -> d572ab0ac refs/heads/cassandra-3.7 1f60143bb -> b2a9c5634 refs/heads/trunk 9b47dd50f -> 59896cb82
Avoid holding SSTableReaders for duration of incremental repair Patch by Paulo Motta; reviewed by Marcus Eriksson for CASSANDRA-11739 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/675591d1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/675591d1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/675591d1 Branch: refs/heads/cassandra-2.1 Commit: 675591d1c55805c13db95692c78f5feb63538eb5 Parents: 129b119 Author: Paulo Motta <[email protected]> Authored: Tue May 17 17:29:50 2016 -0300 Committer: Marcus Eriksson <[email protected]> Committed: Tue May 24 07:25:44 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 2 +- .../cassandra/service/ActiveRepairService.java | 53 +++++---- .../service/ActiveRepairServiceTest.java | 107 +++++++++++++++++++ 4 files changed, 140 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/675591d1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7a0ccba..fcd7c3c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.15 + * Avoid holding SSTableReaders for duration of incremental repair (CASSANDRA-11739) * Add message dropped tasks to nodetool netstats (CASSANDRA-11855) * Don't compute expensive MaxPurgeableTimestamp until we've verified there's an expired tombstone (CASSANDRA-11834) http://git-wip-us.apache.org/repos/asf/cassandra/blob/675591d1/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 96d873f..5af63fe 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -474,7 +474,7 @@ public class CompactionManager implements CompactionManagerMBean /** * Make sure the {validatedForRepair} are marked for compaction before calling this. * - * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getAndReferenceSSTables(..)). + * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getActiveRepairedSSTableRefs(..)). * * @param cfs * @param ranges Ranges that the repair was carried out on http://git-wip-us.apache.org/repos/asf/cassandra/blob/675591d1/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 732267e..5297ce3 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.service; -import java.io.File; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; @@ -42,7 +41,6 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.IAsyncCallbackWithFailure; @@ -321,7 +319,7 @@ public class ActiveRepairService Set<SSTableReader> repairing = new HashSet<>(); for (Map.Entry<UUID, ParentRepairSession> entry : parentRepairSessions.entrySet()) { - Collection<SSTableReader> sstables = entry.getValue().sstableMap.get(cfId); + Collection<SSTableReader> sstables = entry.getValue().getActiveSSTables(cfId); if (sstables != null && !entry.getKey().equals(parentRepairSession)) repairing.addAll(sstables); } @@ -384,7 +382,7 @@ public class ActiveRepairService List<ListenableFuture<?>> futures = new ArrayList<>(); for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet()) { - Refs<SSTableReader> sstables = prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey()); + Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefs(columnFamilyStoreEntry.getKey()); ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue(); futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt)); } @@ -428,7 +426,7 @@ public class ActiveRepairService { public final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>(); public final Collection<Range<Token>> ranges; - public final Map<UUID, Set<SSTableReader>> sstableMap = new HashMap<>(); + public final Map<UUID, Set<String>> sstableMap = new HashMap<>(); public final long repairedAt; public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, long repairedAt) @@ -436,39 +434,50 @@ public class ActiveRepairService for (ColumnFamilyStore cfs : columnFamilyStores) { this.columnFamilyStores.put(cfs.metadata.cfId, cfs); - sstableMap.put(cfs.metadata.cfId, new HashSet<SSTableReader>()); + sstableMap.put(cfs.metadata.cfId, new HashSet<String>()); } this.ranges = ranges; this.repairedAt = repairedAt; } - public synchronized Refs<SSTableReader> getAndReferenceSSTables(UUID cfId) + @SuppressWarnings("resource") + public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefs(UUID cfId) { - Set<SSTableReader> sstables = sstableMap.get(cfId); - Iterator<SSTableReader> sstableIterator = sstables.iterator(); ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> references = ImmutableMap.builder(); - while (sstableIterator.hasNext()) + for (SSTableReader sstable : getActiveSSTables(cfId)) { - SSTableReader sstable = sstableIterator.next(); - if (!new File(sstable.descriptor.filenameFor(Component.DATA)).exists()) - { - sstableIterator.remove(); - } + Ref<SSTableReader> ref = sstable.tryRef(); + if (ref == null) + sstableMap.get(cfId).remove(sstable.getFilename()); else + references.put(sstable, ref); + } + return new Refs<>(references.build()); + } + + private Set<SSTableReader> getActiveSSTables(UUID cfId) + { + Set<String> repairedSSTables = sstableMap.get(cfId); + Set<SSTableReader> activeSSTables = new HashSet<>(); + Set<String> activeSSTableNames = new HashSet<>(); + for (SSTableReader sstable : columnFamilyStores.get(cfId).getSSTables()) + { + if (repairedSSTables.contains(sstable.getFilename())) { - Ref<SSTableReader> ref = sstable.tryRef(); - if (ref == null) - sstableIterator.remove(); - else - references.put(sstable, ref); + activeSSTables.add(sstable); + activeSSTableNames.add(sstable.getFilename()); } } - return new Refs<>(references.build()); + sstableMap.put(cfId, activeSSTableNames); + return activeSSTables; } public void addSSTables(UUID cfId, Collection<SSTableReader> sstables) { - sstableMap.get(cfId).addAll(sstables); + for (SSTableReader sstable : sstables) + { + sstableMap.get(cfId).add(sstable.getFilename()); + } } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/675591d1/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java new file mode 100644 index 0000000..419ea1a --- /dev/null +++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.UUID; + +import com.google.common.collect.Sets; + +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.concurrent.Refs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class ActiveRepairServiceTest extends SchemaLoader +{ + + private static final String KEYSPACE1 = "Keyspace1"; + private static final String CF = "Standard1"; + + @Test + public void testGetActiveRepairedSSTableRefs() + { + ColumnFamilyStore store = prepareColumnFamilyStore(); + Set<SSTableReader> original = store.getUnrepairedSSTables(); + + UUID prsId = UUID.randomUUID(); + ActiveRepairService.instance.registerParentRepairSession(prsId, Collections.singletonList(store), null); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId); + + //add all sstables to parent repair session + prs.addSSTables(store.metadata.cfId, original); + + //retrieve all sstable references from parent repair sessions + Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId); + Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator()); + assertEquals(original, retrieved); + refs.release(); + + //remove 1 sstable from data data tracker + Set<SSTableReader> newLiveSet = new HashSet<>(original); + Iterator<SSTableReader> it = newLiveSet.iterator(); + SSTableReader removed = it.next(); + it.remove(); + store.getDataTracker().replaceWithNewInstances(Collections.singleton(removed), Collections.EMPTY_SET); + + //retrieve sstable references from parent repair session again - removed sstable must not be present + refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId); + retrieved = Sets.newHashSet(refs.iterator()); + assertEquals(newLiveSet, retrieved); + assertFalse(retrieved.contains(removed)); + refs.release(); + } + + private ColumnFamilyStore prepareColumnFamilyStore() + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF); + store.truncateBlocking(); + store.disableAutoCompaction(); + long timestamp = System.currentTimeMillis(); + //create 10 sstables + for (int i = 0; i < 10; i++) + { + DecoratedKey key = Util.dk(Integer.toString(i)); + Mutation rm = new Mutation(KEYSPACE1, key.getKey()); + for (int j = 0; j < 10; j++) + rm.add("Standard1", Util.cellname(Integer.toString(j)), + ByteBufferUtil.EMPTY_BYTE_BUFFER, + timestamp, + 0); + rm.apply(); + store.forceBlockingFlush(); + } + return store; + } +}
