This is an automated email from the ASF dual-hosted git repository. brandonwilliams pushed a commit to branch cassandra-3.11 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit aaabf0fb74380a3bec254bfb97dd6ee97b99bb6d Merge: 06e4e81 45ad38f Author: Brandon Williams <[email protected]> AuthorDate: Wed Oct 28 08:52:11 2020 -0500 Merge branch 'cassandra-3.0' into cassandra-3.11 CHANGES.txt | 1 + .../org/apache/cassandra/repair/RepairSession.java | 15 ++++ .../org/apache/cassandra/repair/RepairJobTest.java | 80 ++++++++++++++++++++-- 3 files changed, 91 insertions(+), 5 deletions(-) diff --cc CHANGES.txt index d04a1e5,0f016d0..9bee8ee --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,11 -1,5 +1,12 @@@ -3.0.23: +3.11.9 + * Synchronize Keyspace instance store/clear (CASSANDRA-16210) + * Fix ColumnFilter to avoid querying cells of unselected complex columns (CASSANDRA-15977) + * Fix memory leak in CompressedChunkReader (CASSANDRA-15880) + * Don't attempt value skipping with mixed version cluster (CASSANDRA-15833) + * Avoid failing compactions with very large partitions (CASSANDRA-15164) + * Make sure LCS handles duplicate sstable added/removed notifications correctly (CASSANDRA-14103) +Merged from 3.0: + * Fix OOM when terminating repair session (CASSANDRA-15902) * Avoid marking shutting down nodes as up after receiving gossip shutdown message (CASSANDRA-16094) * Check SSTables for latest version before dropping compact storage (CASSANDRA-16063) * Handle unexpected columns due to schema races (CASSANDRA-15899) diff --cc test/unit/org/apache/cassandra/repair/RepairJobTest.java index e1dd5b3,5269182..979ecc5 --- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java @@@ -85,17 -88,20 +88,21 @@@ public class RepairJobTest extends Sche private RepairJob job; private RepairJobDesc sessionJobDesc; - // So that threads actually get recycled and we can have accurate memory accounting while testing - // memory retention from CASSANDRA-14096 private static class MeasureableRepairSession extends RepairSession { + private final CountDownLatch validationCompleteReached = new CountDownLatch(1); + + private volatile boolean simulateValidationsOutstanding; + - public MeasureableRepairSession(UUID parentRepairSession, UUID id, Collection<Range<Token>> ranges, String keyspace, - RepairParallelism parallelismDegree, Set<InetAddress> endpoints, long repairedAt, String... cfnames) + public MeasureableRepairSession(UUID parentRepairSession, UUID id, Collection<Range<Token>> ranges, - String keyspace,RepairParallelism parallelismDegree, Set<InetAddress> endpoints, ++ String keyspace, RepairParallelism parallelismDegree, Set<InetAddress> endpoints, + long repairedAt, boolean pullRepair, String... cfnames) { - super(parentRepairSession, id, ranges, keyspace, parallelismDegree, endpoints, repairedAt, cfnames); + super(parentRepairSession, id, ranges, keyspace, parallelismDegree, endpoints, repairedAt, pullRepair, cfnames); } + // So that threads actually get recycled and we can have accurate memory accounting while testing + // memory retention from CASSANDRA-14096 protected DebuggableThreadPoolExecutor createExecutor() { DebuggableThreadPoolExecutor executor = super.createExecutor(); @@@ -247,7 -277,46 +278,46 @@@ assertEquals(2, numDifferent); } - private void assertExpectedDifferences(Collection<RemoteSyncTask> tasks, Integer ... differences) + /** + * CASSANDRA-15902: Verify that repair job will be released after force shutdown on the session + */ + @Test + public void releaseThreadAfterSessionForceShutdown() throws Throwable + { + Map<InetAddress, MerkleTrees> mockTrees = new HashMap<>(); + mockTrees.put(FBUtilities.getBroadcastAddress(), createInitialTree(false)); + mockTrees.put(addr2, createInitialTree(false)); + mockTrees.put(addr3, createInitialTree(false)); + + List<MessageOut> observedMessages = new ArrayList<>(); + interceptRepairMessages(mockTrees, observedMessages); + + session.simulateValidationsOutstanding(); + + Thread jobThread = new Thread(() -> job.run()); + jobThread.start(); + + session.waitUntilReceivedFirstValidationComplete(); + + session.forceShutdown(new Exception("force shutdown for testing")); + + jobThread.join(TimeUnit.SECONDS.toMillis(TEST_TIMEOUT_S)); + assertFalse("expect that the job thread has been finished and not waiting on the outstanding validations forever", jobThread.isAlive()); + + // RepairJob should send out 3 x SNAPSHOTS -> 1 x VALIDATION -> done + // Only one VALIDATION because we shutdown the session after first validation + List<RepairMessage.Type> expectedTypes = new ArrayList<>(); + for (int i = 0; i < 3; i++) + expectedTypes.add(RepairMessage.Type.SNAPSHOT); + + expectedTypes.add(RepairMessage.Type.VALIDATION_REQUEST); + + assertEquals(expectedTypes, observedMessages.stream() + .map(k -> ((RepairMessage) k.payload).messageType) + .collect(Collectors.toList())); + } - - private void assertExpectedDifferences(Collection<RemoteSyncTask> tasks, Integer ... differences) ++ ++ private void assertExpectedDifferences(Collection<RemoteSyncTask> tasks, Integer... differences) { List<Integer> expectedDifferences = new ArrayList<>(Arrays.asList(differences)); List<Integer> observedDifferences = tasks.stream() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
