Repository: cassandra Updated Branches: refs/heads/trunk ba87ab4e9 -> 9c3354e32
Fix race / ref leak in PendingRepairManager Patch by Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-13751 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9c3354e3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9c3354e3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9c3354e3 Branch: refs/heads/trunk Commit: 9c3354e3211c6a3f3982e87477e156c29cd9b7ea Parents: ba87ab4 Author: Blake Eggleston <bdeggles...@gmail.com> Authored: Tue Aug 8 10:32:35 2017 -0700 Committer: Blake Eggleston <bdeggles...@gmail.com> Committed: Thu Aug 10 12:01:00 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../compaction/AbstractCompactionStrategy.java | 29 ++--------------- .../compaction/CompactionStrategyManager.java | 25 +++++++++++--- .../compaction/LeveledCompactionStrategy.java | 10 +----- .../db/compaction/PendingRepairManager.java | 34 +++++++++++++++++--- .../cassandra/io/sstable/ISSTableScanner.java | 34 ++++++++++++++++++++ .../db/compaction/PendingRepairManagerTest.java | 24 ++++++++++++++ 7 files changed, 113 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3354e3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 849848f..e997b50 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Fix race / ref leak in PendingRepairManager (CASSANDRA-13751) * Enable ppc64le runtime as unsupported architecture (CASSANDRA-13615) * Improve sstablemetadata output (CASSANDRA-11483) * Support for migrating legacy users to roles has been dropped (CASSANDRA-13371) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3354e3/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 5333683..f1f42a7 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -293,15 +293,7 @@ public abstract class AbstractCompactionStrategy } catch (Throwable t) { - try - { - new ScannerList(scanners).close(); - } - catch (Throwable t2) - { - t.addSuppressed(t2); - } - throw t; + ISSTableScanner.closeAllAndPropagate(scanners, t); } return new ScannerList(scanners); } @@ -385,24 +377,7 @@ public abstract class AbstractCompactionStrategy public void close() { - Throwable t = null; - for (ISSTableScanner scanner : scanners) - { - try - { - scanner.close(); - } - catch (Throwable t2) - { - JVMStabilityInspector.inspectThrowable(t2); - if (t == null) - t = t2; - else - t.addSuppressed(t2); - } - } - if (t != null) - throw Throwables.propagate(t); + ISSTableScanner.closeAllAndPropagate(scanners, null); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3354e3/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index e58ccc2..6342a1b 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -21,7 +21,6 @@ package org.apache.cassandra.db.compaction; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.function.Supplier; @@ -735,7 +734,7 @@ public class CompactionStrategyManager implements INotificationConsumer * @return */ @SuppressWarnings("resource") - public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges) + public AbstractCompactionStrategy.ScannerList maybeGetScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges) { assert repaired.size() == unrepaired.size(); assert repaired.size() == pendingRepairs.size(); @@ -781,13 +780,31 @@ public class CompactionStrategyManager implements INotificationConsumer if (!unrepairedSSTables.get(i).isEmpty()) scanners.addAll(unrepaired.get(i).getScanners(unrepairedSSTables.get(i), ranges).scanners); } - - return new AbstractCompactionStrategy.ScannerList(scanners); + } + catch (PendingRepairManager.IllegalSSTableArgumentException e) + { + ISSTableScanner.closeAllAndPropagate(scanners, new ConcurrentModificationException(e)); } finally { readLock.unlock(); } + return new AbstractCompactionStrategy.ScannerList(scanners); + } + + public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges) + { + while (true) + { + try + { + return maybeGetScanners(sstables, ranges); + } + catch (ConcurrentModificationException e) + { + logger.debug("SSTable repairedAt/pendingRepaired values changed while getting scanners"); + } + } } public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3354e3/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index 4f11a03..8086be9 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -307,15 +307,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy } catch (Throwable t) { - try - { - new ScannerList(scanners).close(); - } - catch (Throwable t2) - { - t.addSuppressed(t2); - } - throw t; + ISSTableScanner.closeAllAndPropagate(scanners, t); } return new ScannerList(scanners); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3354e3/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java index afde263..183af7a 100644 --- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java +++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java @@ -64,6 +64,17 @@ class PendingRepairManager private final CompactionParams params; private volatile ImmutableMap<UUID, AbstractCompactionStrategy> strategies = ImmutableMap.of(); + /** + * Indicates we're being asked to do something with an sstable that isn't marked pending repair + */ + public static class IllegalSSTableArgumentException extends IllegalArgumentException + { + public IllegalSSTableArgumentException(String s) + { + super(s); + } + } + PendingRepairManager(ColumnFamilyStore cfs, CompactionParams params) { this.cfs = cfs; @@ -88,6 +99,7 @@ class PendingRepairManager AbstractCompactionStrategy getOrCreate(UUID id) { + checkPendingID(id); assert id != null; AbstractCompactionStrategy strategy = get(id); if (strategy == null) @@ -107,9 +119,16 @@ class PendingRepairManager return strategy; } + private static void checkPendingID(UUID pendingID) + { + if (pendingID == null) + { + throw new IllegalSSTableArgumentException("sstable is not pending repair"); + } + } + AbstractCompactionStrategy getOrCreate(SSTableReader sstable) { - assert sstable.isPendingRepair(); return getOrCreate(sstable.getSSTableMetadata().pendingRepair); } @@ -352,14 +371,21 @@ class PendingRepairManager for (SSTableReader sstable : sstables) { UUID sessionID = sstable.getSSTableMetadata().pendingRepair; - assert sessionID != null; + checkPendingID(sessionID); sessionSSTables.computeIfAbsent(sessionID, k -> new HashSet<>()).add(sstable); } Set<ISSTableScanner> scanners = new HashSet<>(sessionSSTables.size()); - for (Map.Entry<UUID, Set<SSTableReader>> entry : sessionSSTables.entrySet()) + try + { + for (Map.Entry<UUID, Set<SSTableReader>> entry : sessionSSTables.entrySet()) + { + scanners.addAll(getOrCreate(entry.getKey()).getScanners(entry.getValue(), ranges).scanners); + } + } + catch (Throwable t) { - scanners.addAll(getOrCreate(entry.getKey()).getScanners(entry.getValue(), ranges).scanners); + ISSTableScanner.closeAllAndPropagate(scanners, t); } return scanners; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3354e3/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java index 2dff34e..1c1d74b 100644 --- a/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java @@ -19,7 +19,12 @@ package org.apache.cassandra.io.sstable; +import java.util.Collection; + +import com.google.common.base.Throwables; + import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.utils.JVMStabilityInspector; /** * An ISSTableScanner is an abstraction allowing multiple SSTableScanners to be @@ -32,4 +37,33 @@ public interface ISSTableScanner extends UnfilteredPartitionIterator public long getCurrentPosition(); public long getBytesScanned(); public String getBackingFiles(); + + public static void closeAllAndPropagate(Collection<ISSTableScanner> scanners, Throwable throwable) + { + for (ISSTableScanner scanner: scanners) + { + try + { + scanner.close(); + } + catch (Throwable t2) + { + JVMStabilityInspector.inspectThrowable(t2); + if (throwable == null) + { + throwable = t2; + } + else + { + throwable.addSuppressed(t2); + } + } + } + + if (throwable != null) + { + Throwables.propagate(throwable); + } + + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3354e3/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java index a173b4b..93b68b5 100644 --- a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java @@ -240,4 +240,28 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest tasks.stream().forEach(t -> t.transaction.abort()); } } + + /** + * Tests that a IllegalSSTableArgumentException is thrown if we try to get + * scanners for an sstable that isn't pending repair + */ + @Test(expected = PendingRepairManager.IllegalSSTableArgumentException.class) + public void getScannersInvalidSSTable() throws Exception + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + SSTableReader sstable = makeSSTable(true); + prm.getScanners(Collections.singleton(sstable), Collections.singleton(RANGE1)); + } + + /** + * Tests that a IllegalSSTableArgumentException is thrown if we try to get + * scanners for an sstable that isn't pending repair + */ + @Test(expected = PendingRepairManager.IllegalSSTableArgumentException.class) + public void getOrCreateInvalidSSTable() throws Exception + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + SSTableReader sstable = makeSSTable(true); + prm.getOrCreate(sstable); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org