Repository: cassandra Updated Branches: refs/heads/trunk 478c1a9fd -> c5a7fcaa8
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 950966f..b60088c 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.management.MBeanServer; import javax.management.ObjectName; -import com.google.common.base.Predicate; +import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableSet; @@ -47,7 +47,7 @@ import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.RequestFailureReason; @@ -59,7 +59,6 @@ import org.apache.cassandra.gms.IFailureDetector; import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; import org.apache.cassandra.gms.IFailureDetectionEventListener; import org.apache.cassandra.gms.VersionedValue; -import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.IAsyncCallbackWithFailure; @@ -549,6 +548,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai */ public static class ParentRepairSession { + private final Keyspace keyspace; private final Map<TableId, ColumnFamilyStore> columnFamilyStores = new HashMap<>(); private final Collection<Range<Token>> ranges; public final boolean isIncremental; @@ -560,10 +560,16 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai public ParentRepairSession(InetAddressAndPort coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal, PreviewKind previewKind) { this.coordinator = coordinator; + Set<Keyspace> keyspaces = new HashSet<>(); for (ColumnFamilyStore cfs : columnFamilyStores) { + keyspaces.add(cfs.keyspace); this.columnFamilyStores.put(cfs.metadata.id, cfs); } + + Preconditions.checkArgument(keyspaces.size() == 1, "repair sessions cannot operate on multiple keyspaces"); + this.keyspace = Iterables.getOnlyElement(keyspaces); + this.ranges = ranges; this.repairedAt = repairedAt; this.isIncremental = isIncremental; @@ -576,42 +582,14 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai return previewKind != PreviewKind.NONE; } - public Predicate<SSTableReader> getPreviewPredicate() - { - switch (previewKind) - { - case ALL: - return (s) -> true; - case REPAIRED: - return (s) -> s.isRepaired(); - case UNREPAIRED: - return (s) -> !s.isRepaired(); - default: - throw new RuntimeException("Can't get preview predicate for preview kind " + previewKind); - } - } - - public synchronized void maybeSnapshot(TableId tableId, UUID parentSessionId) + public Collection<ColumnFamilyStore> getColumnFamilyStores() { - String snapshotName = parentSessionId.toString(); - if (!columnFamilyStores.get(tableId).snapshotExists(snapshotName)) - { - Set<SSTableReader> snapshottedSSTables = columnFamilyStores.get(tableId).snapshot(snapshotName, new Predicate<SSTableReader>() - { - public boolean apply(SSTableReader sstable) - { - return sstable != null && - (!isIncremental || !sstable.isRepaired()) && - !(sstable.metadata().isIndex()) && // exclude SSTables from 2i - new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges); - } - }, true, false); - } + return ImmutableSet.<ColumnFamilyStore>builder().addAll(columnFamilyStores.values()).build(); } - public Collection<ColumnFamilyStore> getColumnFamilyStores() + public Keyspace getKeyspace() { - return ImmutableSet.<ColumnFamilyStore>builder().addAll(columnFamilyStores.values()).build(); + return keyspace; } public Set<TableId> getTableIds() http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java deleted file mode 100644 index 8290adf..0000000 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.db.compaction; - -import java.util.Collections; -import java.util.Iterator; -import java.util.Set; -import java.util.UUID; - -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.cql3.statements.CreateTableStatement; -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.schema.Schema; -import org.apache.cassandra.cql3.QueryProcessor; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.streaming.PreviewKind; -import org.apache.cassandra.repair.RepairJobDesc; -import org.apache.cassandra.repair.Validator; -import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.service.ActiveRepairService; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.UUIDGen; - -/** - * Tests correct sstables are returned from CompactionManager.getSSTablesForValidation - * for consistent, legacy incremental, and full repairs - */ -public class CompactionManagerGetSSTablesForValidationTest -{ - private String ks; - private static final String tbl = "tbl"; - private ColumnFamilyStore cfs; - private static InetAddressAndPort coordinator; - - private static Token MT; - - private SSTableReader repaired; - private SSTableReader unrepaired; - private SSTableReader pendingRepair; - - private UUID sessionID; - private RepairJobDesc desc; - - @BeforeClass - public static void setupClass() throws Exception - { - SchemaLoader.prepareServer(); - coordinator = InetAddressAndPort.getByName("10.0.0.1"); - MT = DatabaseDescriptor.getPartitioner().getMinimumToken(); - } - - @Before - public void setup() throws Exception - { - ks = "ks_" + System.currentTimeMillis(); - TableMetadata cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build(); - SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm); - cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id); - } - - private void makeSSTables() - { - for (int i=0; i<3; i++) - { - QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES(?, ?)", ks, tbl), i, i); - cfs.forceBlockingFlush(); - } - Assert.assertEquals(3, cfs.getLiveSSTables().size()); - - } - - private void registerRepair(boolean incremental) throws Exception - { - sessionID = UUIDGen.getTimeUUID(); - Range<Token> range = new Range<>(MT, MT); - ActiveRepairService.instance.registerParentRepairSession(sessionID, - coordinator, - Lists.newArrayList(cfs), - Sets.newHashSet(range), - incremental, - incremental ? System.currentTimeMillis() : ActiveRepairService.UNREPAIRED_SSTABLE, - true, - PreviewKind.NONE); - desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, Collections.singleton(range)); - } - - private void modifySSTables() throws Exception - { - Iterator<SSTableReader> iter = cfs.getLiveSSTables().iterator(); - - repaired = iter.next(); - repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, System.currentTimeMillis(), null); - repaired.reloadSSTableMetadata(); - - pendingRepair = iter.next(); - pendingRepair.descriptor.getMetadataSerializer().mutateRepaired(pendingRepair.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, sessionID); - pendingRepair.reloadSSTableMetadata(); - - unrepaired = iter.next(); - - Assert.assertFalse(iter.hasNext()); - } - - @Test - public void consistentRepair() throws Exception - { - makeSSTables(); - registerRepair(true); - modifySSTables(); - - // get sstables for repair - Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), true, PreviewKind.NONE); - Set<SSTableReader> sstables = Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, validator)); - Assert.assertNotNull(sstables); - Assert.assertEquals(1, sstables.size()); - Assert.assertTrue(sstables.contains(pendingRepair)); - } - - @Test - public void legacyIncrementalRepair() throws Exception - { - makeSSTables(); - registerRepair(true); - modifySSTables(); - - // get sstables for repair - Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), false, PreviewKind.NONE); - Set<SSTableReader> sstables = Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, validator)); - Assert.assertNotNull(sstables); - Assert.assertEquals(2, sstables.size()); - Assert.assertTrue(sstables.contains(pendingRepair)); - Assert.assertTrue(sstables.contains(unrepaired)); - } - - @Test - public void fullRepair() throws Exception - { - makeSSTables(); - registerRepair(false); - modifySSTables(); - - // get sstables for repair - Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), false, PreviewKind.NONE); - Set<SSTableReader> sstables = Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, validator)); - Assert.assertNotNull(sstables); - Assert.assertEquals(3, sstables.size()); - Assert.assertTrue(sstables.contains(pendingRepair)); - Assert.assertTrue(sstables.contains(unrepaired)); - Assert.assertTrue(sstables.contains(repaired)); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index 567984d..f7e9b90 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -51,6 +51,7 @@ import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.notifications.SSTableAddedNotification; import org.apache.cassandra.notifications.SSTableRepairStatusChanged; +import org.apache.cassandra.repair.ValidationManager; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.repair.RepairJobDesc; import org.apache.cassandra.repair.Validator; @@ -204,7 +205,8 @@ public class LeveledCompactionStrategyTest PreviewKind.NONE); RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, Arrays.asList(range)); Validator validator = new Validator(desc, FBUtilities.getBroadcastAddressAndPort(), gcBefore, PreviewKind.NONE); - CompactionManager.instance.submitValidation(cfs, validator).get(); + + ValidationManager.instance.submitValidation(cfs, validator).get(); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java b/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java new file mode 100644 index 0000000..365ad7e --- /dev/null +++ b/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.repair; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.UUID; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.repair.RepairJobDesc; +import org.apache.cassandra.repair.Validator; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; + +import static org.apache.cassandra.db.repair.CassandraValidationIterator.getSSTablesToValidate; + +/** + * Tests correct sstables are returned from CompactionManager.getSSTablesForValidation + * for consistent, legacy incremental, and full repairs + */ +public class CompactionManagerGetSSTablesForValidationTest +{ + private String ks; + private static final String tbl = "tbl"; + private ColumnFamilyStore cfs; + private static InetAddressAndPort coordinator; + + private static Token MT; + + private SSTableReader repaired; + private SSTableReader unrepaired; + private SSTableReader pendingRepair; + + private UUID sessionID; + private RepairJobDesc desc; + + @BeforeClass + public static void setupClass() throws Exception + { + SchemaLoader.prepareServer(); + coordinator = InetAddressAndPort.getByName("10.0.0.1"); + MT = DatabaseDescriptor.getPartitioner().getMinimumToken(); + } + + @Before + public void setup() throws Exception + { + ks = "ks_" + System.currentTimeMillis(); + TableMetadata cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build(); + SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm); + cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id); + } + + private void makeSSTables() + { + for (int i=0; i<3; i++) + { + QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES(?, ?)", ks, tbl), i, i); + cfs.forceBlockingFlush(); + } + Assert.assertEquals(3, cfs.getLiveSSTables().size()); + + } + + private void registerRepair(boolean incremental) throws Exception + { + sessionID = UUIDGen.getTimeUUID(); + Range<Token> range = new Range<>(MT, MT); + ActiveRepairService.instance.registerParentRepairSession(sessionID, + coordinator, + Lists.newArrayList(cfs), + Sets.newHashSet(range), + incremental, + incremental ? System.currentTimeMillis() : ActiveRepairService.UNREPAIRED_SSTABLE, + true, + PreviewKind.NONE); + desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, Collections.singleton(range)); + } + + private void modifySSTables() throws Exception + { + Iterator<SSTableReader> iter = cfs.getLiveSSTables().iterator(); + + repaired = iter.next(); + repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, System.currentTimeMillis(), null); + repaired.reloadSSTableMetadata(); + + pendingRepair = iter.next(); + pendingRepair.descriptor.getMetadataSerializer().mutateRepaired(pendingRepair.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, sessionID); + pendingRepair.reloadSSTableMetadata(); + + unrepaired = iter.next(); + + Assert.assertFalse(iter.hasNext()); + } + + @Test + public void consistentRepair() throws Exception + { + makeSSTables(); + registerRepair(true); + modifySSTables(); + + // get sstables for repair + Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), true, PreviewKind.NONE); + Set<SSTableReader> sstables = Sets.newHashSet(getSSTablesToValidate(cfs, validator.desc.ranges, validator.desc.parentSessionId, validator.isIncremental)); + Assert.assertNotNull(sstables); + Assert.assertEquals(1, sstables.size()); + Assert.assertTrue(sstables.contains(pendingRepair)); + } + + @Test + public void legacyIncrementalRepair() throws Exception + { + makeSSTables(); + registerRepair(true); + modifySSTables(); + + // get sstables for repair + Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), false, PreviewKind.NONE); + Set<SSTableReader> sstables = Sets.newHashSet(getSSTablesToValidate(cfs, validator.desc.ranges, validator.desc.parentSessionId, validator.isIncremental)); + Assert.assertNotNull(sstables); + Assert.assertEquals(2, sstables.size()); + Assert.assertTrue(sstables.contains(pendingRepair)); + Assert.assertTrue(sstables.contains(unrepaired)); + } + + @Test + public void fullRepair() throws Exception + { + makeSSTables(); + registerRepair(false); + modifySSTables(); + + // get sstables for repair + Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), false, PreviewKind.NONE); + Set<SSTableReader> sstables = Sets.newHashSet(getSSTablesToValidate(cfs, validator.desc.ranges, validator.desc.parentSessionId, validator.isIncremental)); + Assert.assertNotNull(sstables); + Assert.assertEquals(3, sstables.size()); + Assert.assertTrue(sstables.contains(pendingRepair)); + Assert.assertTrue(sstables.contains(unrepaired)); + Assert.assertTrue(sstables.contains(repaired)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java new file mode 100644 index 0000000..269a725 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.repair; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableFutureTask; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.UUIDGen; +import org.apache.cassandra.utils.concurrent.Transactional; + +public class PendingAntiCompactionTest +{ + private static final Logger logger = LoggerFactory.getLogger(PendingAntiCompactionTest.class); + private static final Collection<Range<Token>> FULL_RANGE; + static + { + DatabaseDescriptor.daemonInitialization(); + Token minToken = DatabaseDescriptor.getPartitioner().getMinimumToken(); + FULL_RANGE = Collections.singleton(new Range<>(minToken, minToken)); + } + + private String ks; + private final String tbl = "tbl"; + private TableMetadata cfm; + private ColumnFamilyStore cfs; + + @BeforeClass + public static void setupClass() + { + SchemaLoader.prepareServer(); + } + + @Before + public void setup() + { + ks = "ks_" + System.currentTimeMillis(); + cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build(); + SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm); + cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id); + } + + private void makeSSTables(int num) + { + for (int i = 0; i < num; i++) + { + int val = i * 2; // multiplied to prevent ranges from overlapping + QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), val, val); + QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), val+1, val+1); + cfs.forceBlockingFlush(); + } + Assert.assertEquals(num, cfs.getLiveSSTables().size()); + } + + private static class InstrumentedAcquisitionCallback extends PendingAntiCompaction.AcquisitionCallback + { + public InstrumentedAcquisitionCallback(UUID parentRepairSession, Collection<Range<Token>> ranges) + { + super(parentRepairSession, ranges); + } + + Set<TableId> submittedCompactions = new HashSet<>(); + + ListenableFuture<?> submitPendingAntiCompaction(PendingAntiCompaction.AcquireResult result) + { + submittedCompactions.add(result.cfs.metadata.id); + result.abort(); // prevent ref leak complaints + return ListenableFutureTask.create(() -> {}, null); + } + } + + /** + * verify the pending anti compaction happy path + */ + @Test + public void successCase() throws Exception + { + Assert.assertSame(ByteOrderedPartitioner.class, DatabaseDescriptor.getPartitioner().getClass()); + cfs.disableAutoCompaction(); + + // create 2 sstables, one that will be split, and another that will be moved + for (int i = 0; i < 8; i++) + { + QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), i, i); + } + cfs.forceBlockingFlush(); + for (int i = 8; i < 12; i++) + { + QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), i, i); + } + cfs.forceBlockingFlush(); + Assert.assertEquals(2, cfs.getLiveSSTables().size()); + + Token left = ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes((int) 6)); + Token right = ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes((int) 16)); + List<ColumnFamilyStore> tables = Lists.newArrayList(cfs); + Collection<Range<Token>> ranges = Collections.singleton(new Range<>(left, right)); + + // create a session so the anti compaction can fine it + UUID sessionID = UUIDGen.getTimeUUID(); + ActiveRepairService.instance.registerParentRepairSession(sessionID, InetAddressAndPort.getLocalHost(), tables, ranges, true, 1, true, PreviewKind.NONE); + + PendingAntiCompaction pac; + ExecutorService executor = Executors.newSingleThreadExecutor(); + try + { + pac = new PendingAntiCompaction(sessionID, tables, ranges, executor); + pac.run().get(); + } + finally + { + executor.shutdown(); + } + + Assert.assertEquals(3, cfs.getLiveSSTables().size()); + int pendingRepair = 0; + for (SSTableReader sstable : cfs.getLiveSSTables()) + { + if (sstable.isPendingRepair()) + pendingRepair++; + } + Assert.assertEquals(2, pendingRepair); + } + + @Test + public void acquisitionSuccess() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(6); + List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables()); + List<SSTableReader> expected = sstables.subList(0, 3); + Collection<Range<Token>> ranges = new HashSet<>(); + for (SSTableReader sstable : expected) + { + ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken())); + } + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, ranges, UUIDGen.getTimeUUID()); + + logger.info("SSTables: {}", sstables); + logger.info("Expected: {}", expected); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + Assert.assertNotNull(result); + logger.info("Originals: {}", result.txn.originals()); + Assert.assertEquals(3, result.txn.originals().size()); + for (SSTableReader sstable : expected) + { + logger.info("Checking {}", sstable); + Assert.assertTrue(result.txn.originals().contains(sstable)); + } + + Assert.assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state()); + result.abort(); + } + + @Test + public void repairedSSTablesAreNotAcquired() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(2); + + List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables()); + Assert.assertEquals(2, sstables.size()); + SSTableReader repaired = sstables.get(0); + SSTableReader unrepaired = sstables.get(1); + Assert.assertTrue(repaired.intersects(FULL_RANGE)); + Assert.assertTrue(unrepaired.intersects(FULL_RANGE)); + + repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, 1, null); + repaired.reloadSSTableMetadata(); + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID()); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + Assert.assertNotNull(result); + + logger.info("Originals: {}", result.txn.originals()); + Assert.assertEquals(1, result.txn.originals().size()); + Assert.assertTrue(result.txn.originals().contains(unrepaired)); + result.abort(); // release sstable refs + } + + @Test + public void pendingRepairSSTablesAreNotAcquired() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(2); + + List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables()); + Assert.assertEquals(2, sstables.size()); + SSTableReader repaired = sstables.get(0); + SSTableReader unrepaired = sstables.get(1); + Assert.assertTrue(repaired.intersects(FULL_RANGE)); + Assert.assertTrue(unrepaired.intersects(FULL_RANGE)); + + repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, 0, UUIDGen.getTimeUUID()); + repaired.reloadSSTableMetadata(); + Assert.assertTrue(repaired.isPendingRepair()); + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID()); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + Assert.assertNotNull(result); + + logger.info("Originals: {}", result.txn.originals()); + Assert.assertEquals(1, result.txn.originals().size()); + Assert.assertTrue(result.txn.originals().contains(unrepaired)); + result.abort(); // releases sstable refs + } + + @Test + public void pendingRepairNoSSTablesExist() throws Exception + { + cfs.disableAutoCompaction(); + + Assert.assertEquals(0, cfs.getLiveSSTables().size()); + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID()); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + Assert.assertNotNull(result); + + result.abort(); // There's nothing to release, but we should exit cleanly + } + + /** + * anti compaction task should be submitted if everything is ok + */ + @Test + public void callbackSuccess() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(2); + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID()); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + Assert.assertNotNull(result); + + InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE); + Assert.assertTrue(cb.submittedCompactions.isEmpty()); + cb.apply(Lists.newArrayList(result)); + + Assert.assertEquals(1, cb.submittedCompactions.size()); + Assert.assertTrue(cb.submittedCompactions.contains(cfm.id)); + } + + /** + * If one of the supplied AcquireResults is null, either an Exception was thrown, or + * we couldn't get a transaction for the sstables. In either case we need to cancel the repair, and release + * any sstables acquired for other tables + */ + @Test + public void callbackNullResult() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(2); + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID()); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + Assert.assertNotNull(result); + Assert.assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state()); + + InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE); + Assert.assertTrue(cb.submittedCompactions.isEmpty()); + cb.apply(Lists.newArrayList(result, null)); + + Assert.assertTrue(cb.submittedCompactions.isEmpty()); + Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, result.txn.state()); + } + + /** + * If an AcquireResult has a null txn, there were no sstables to acquire references + * for, so no anti compaction should have been submitted. + */ + @Test + public void callbackNullTxn() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(2); + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID()); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + Assert.assertNotNull(result); + + ColumnFamilyStore cfs2 = Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata("system", "peers").id); + PendingAntiCompaction.AcquireResult fakeResult = new PendingAntiCompaction.AcquireResult(cfs2, null, null); + + InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE); + Assert.assertTrue(cb.submittedCompactions.isEmpty()); + cb.apply(Lists.newArrayList(result, fakeResult)); + + Assert.assertEquals(1, cb.submittedCompactions.size()); + Assert.assertTrue(cb.submittedCompactions.contains(cfm.id)); + Assert.assertFalse(cb.submittedCompactions.contains(cfs2.metadata.id)); + } + + + @Test + public void singleAnticompaction() throws Exception + { + cfs.disableAutoCompaction(); + makeSSTables(2); + + PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID()); + PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); + UUID sessionID = UUIDGen.getTimeUUID(); + ActiveRepairService.instance.registerParentRepairSession(sessionID, + InetAddressAndPort.getByName("127.0.0.1"), + Lists.newArrayList(cfs), + FULL_RANGE, + true,0, + true, + PreviewKind.NONE); + CompactionManager.instance.performAnticompaction(result.cfs, FULL_RANGE, result.refs, result.txn, + ActiveRepairService.UNREPAIRED_SSTABLE, sessionID, sessionID); + + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/test/unit/org/apache/cassandra/repair/ValidatorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java index 7c3dd27..3b582a9 100644 --- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java +++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java @@ -199,7 +199,7 @@ public class ValidatorTest final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink(); Validator validator = new Validator(desc, FBUtilities.getBroadcastAddressAndPort(), 0, true, false, PreviewKind.NONE); - CompactionManager.instance.submitValidation(cfs, validator); + ValidationManager.instance.submitValidation(cfs, validator); MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS); assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java index 5fa43a9..df51444 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java @@ -19,6 +19,7 @@ package org.apache.cassandra.repair.consistent; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -40,8 +41,11 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.repair.AbstractRepairTest; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.repair.KeyspaceRepairManager; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaConstants; @@ -128,18 +132,20 @@ public class LocalSessionTest extends AbstractRepairTest sentMessages.get(destination).add(message); } - SettableFuture<Object> pendingAntiCompactionFuture = null; - boolean submitPendingAntiCompactionCalled = false; - ListenableFuture submitPendingAntiCompaction(LocalSession session, ExecutorService executor) + SettableFuture<Object> prepareSessionFuture = null; + boolean prepareSessionCalled = false; + + @Override + ListenableFuture prepareSession(KeyspaceRepairManager repairManager, UUID sessionID, Collection<ColumnFamilyStore> tables, Collection<Range<Token>> ranges, ExecutorService executor) { - submitPendingAntiCompactionCalled = true; - if (pendingAntiCompactionFuture != null) + prepareSessionCalled = true; + if (prepareSessionFuture != null) { - return pendingAntiCompactionFuture; + return prepareSessionFuture; } else { - return super.submitPendingAntiCompaction(session, executor); + return super.prepareSession(repairManager, sessionID, tables, ranges, executor); } } @@ -152,9 +158,9 @@ public class LocalSessionTest extends AbstractRepairTest public LocalSession prepareForTest(UUID sessionID) { - pendingAntiCompactionFuture = SettableFuture.create(); + prepareSessionFuture = SettableFuture.create(); handlePrepareMessage(PARTICIPANT1, new PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS)); - pendingAntiCompactionFuture.set(new Object()); + prepareSessionFuture.set(new Object()); sentMessages.clear(); return getSession(sessionID); } @@ -254,10 +260,10 @@ public class LocalSessionTest extends AbstractRepairTest sessions.start(); // replacing future so we can inspect state before and after anti compaction callback - sessions.pendingAntiCompactionFuture = SettableFuture.create(); - Assert.assertFalse(sessions.submitPendingAntiCompactionCalled); + sessions.prepareSessionFuture = SettableFuture.create(); + Assert.assertFalse(sessions.prepareSessionCalled); sessions.handlePrepareMessage(PARTICIPANT1, new PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS)); - Assert.assertTrue(sessions.submitPendingAntiCompactionCalled); + Assert.assertTrue(sessions.prepareSessionCalled); Assert.assertTrue(sessions.sentMessages.isEmpty()); // anti compaction hasn't finished yet, so state in memory and on disk should be PREPARING @@ -267,7 +273,7 @@ public class LocalSessionTest extends AbstractRepairTest Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); // anti compaction has now finished, so state in memory and on disk should be PREPARED - sessions.pendingAntiCompactionFuture.set(new Object()); + sessions.prepareSessionFuture.set(new Object()); session = sessions.getSession(sessionID); Assert.assertNotNull(session); Assert.assertEquals(PREPARED, session.getState()); @@ -289,10 +295,10 @@ public class LocalSessionTest extends AbstractRepairTest sessions.start(); // replacing future so we can inspect state before and after anti compaction callback - sessions.pendingAntiCompactionFuture = SettableFuture.create(); - Assert.assertFalse(sessions.submitPendingAntiCompactionCalled); + sessions.prepareSessionFuture = SettableFuture.create(); + Assert.assertFalse(sessions.prepareSessionCalled); sessions.handlePrepareMessage(PARTICIPANT1, new PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS)); - Assert.assertTrue(sessions.submitPendingAntiCompactionCalled); + Assert.assertTrue(sessions.prepareSessionCalled); Assert.assertTrue(sessions.sentMessages.isEmpty()); // anti compaction hasn't finished yet, so state in memory and on disk should be PREPARING @@ -302,7 +308,7 @@ public class LocalSessionTest extends AbstractRepairTest Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); // anti compaction has now finished, so state in memory and on disk should be PREPARED - sessions.pendingAntiCompactionFuture.setException(new RuntimeException()); + sessions.prepareSessionFuture.setException(new RuntimeException()); session = sessions.getSession(sessionID); Assert.assertNotNull(session); Assert.assertEquals(FAILED, session.getState()); @@ -657,7 +663,7 @@ public class LocalSessionTest extends AbstractRepairTest UUID sessionID = registerSession(); InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); sessions.start(); - sessions.pendingAntiCompactionFuture = SettableFuture.create(); // prevent moving to prepared + sessions.prepareSessionFuture = SettableFuture.create(); // prevent moving to prepared sessions.handlePrepareMessage(PARTICIPANT1, new PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS)); LocalSession session = sessions.getSession(sessionID); @@ -684,9 +690,9 @@ public class LocalSessionTest extends AbstractRepairTest UUID sessionID = registerSession(); InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); sessions.start(); - sessions.pendingAntiCompactionFuture = SettableFuture.create(); + sessions.prepareSessionFuture = SettableFuture.create(); sessions.handlePrepareMessage(PARTICIPANT1, new PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS)); - sessions.pendingAntiCompactionFuture.set(new Object()); + sessions.prepareSessionFuture.set(new Object()); Assert.assertTrue(sessions.isSessionInProgress(sessionID)); sessions.failSession(sessionID); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java deleted file mode 100644 index 213cdd3..0000000 --- a/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java +++ /dev/null @@ -1,365 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.repair.consistent; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListenableFutureTask; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.cql3.statements.CreateTableStatement; -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.schema.Schema; -import org.apache.cassandra.cql3.QueryProcessor; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.dht.ByteOrderedPartitioner; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.streaming.PreviewKind; -import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.service.ActiveRepairService; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.UUIDGen; -import org.apache.cassandra.utils.concurrent.Transactional; - -public class PendingAntiCompactionTest -{ - private static final Logger logger = LoggerFactory.getLogger(PendingAntiCompactionTest.class); - private static final Collection<Range<Token>> FULL_RANGE; - static - { - DatabaseDescriptor.daemonInitialization(); - Token minToken = DatabaseDescriptor.getPartitioner().getMinimumToken(); - FULL_RANGE = Collections.singleton(new Range<>(minToken, minToken)); - } - - private String ks; - private final String tbl = "tbl"; - private TableMetadata cfm; - private ColumnFamilyStore cfs; - - @BeforeClass - public static void setupClass() - { - SchemaLoader.prepareServer(); - } - - @Before - public void setup() - { - ks = "ks_" + System.currentTimeMillis(); - cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build(); - SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm); - cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id); - } - - private void makeSSTables(int num) - { - for (int i = 0; i < num; i++) - { - int val = i * 2; // multiplied to prevent ranges from overlapping - QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), val, val); - QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), val+1, val+1); - cfs.forceBlockingFlush(); - } - Assert.assertEquals(num, cfs.getLiveSSTables().size()); - } - - private static class InstrumentedAcquisitionCallback extends PendingAntiCompaction.AcquisitionCallback - { - public InstrumentedAcquisitionCallback(UUID parentRepairSession, Collection<Range<Token>> ranges) - { - super(parentRepairSession, ranges); - } - - Set<TableId> submittedCompactions = new HashSet<>(); - - ListenableFuture<?> submitPendingAntiCompaction(PendingAntiCompaction.AcquireResult result) - { - submittedCompactions.add(result.cfs.metadata.id); - result.abort(); // prevent ref leak complaints - return ListenableFutureTask.create(() -> {}, null); - } - } - - /** - * verify the pending anti compaction happy path - */ - @Test - public void successCase() throws Exception - { - Assert.assertSame(ByteOrderedPartitioner.class, DatabaseDescriptor.getPartitioner().getClass()); - cfs.disableAutoCompaction(); - - // create 2 sstables, one that will be split, and another that will be moved - for (int i = 0; i < 8; i++) - { - QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), i, i); - } - cfs.forceBlockingFlush(); - for (int i = 8; i < 12; i++) - { - QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), i, i); - } - cfs.forceBlockingFlush(); - Assert.assertEquals(2, cfs.getLiveSSTables().size()); - - Token left = ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes((int) 6)); - Token right = ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes((int) 16)); - Collection<Range<Token>> ranges = Collections.singleton(new Range<>(left, right)); - - // create a session so the anti compaction can fine it - UUID sessionID = UUIDGen.getTimeUUID(); - ActiveRepairService.instance.registerParentRepairSession(sessionID, InetAddressAndPort.getLocalHost(), Lists.newArrayList(cfs), ranges, true, 1, true, PreviewKind.NONE); - - PendingAntiCompaction pac; - ExecutorService executor = Executors.newSingleThreadExecutor(); - try - { - pac = new PendingAntiCompaction(sessionID, ranges, executor); - pac.run().get(); - } - finally - { - executor.shutdown(); - } - - Assert.assertEquals(3, cfs.getLiveSSTables().size()); - int pendingRepair = 0; - for (SSTableReader sstable : cfs.getLiveSSTables()) - { - if (sstable.isPendingRepair()) - pendingRepair++; - } - Assert.assertEquals(2, pendingRepair); - } - - @Test - public void acquisitionSuccess() throws Exception - { - cfs.disableAutoCompaction(); - makeSSTables(6); - List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables()); - List<SSTableReader> expected = sstables.subList(0, 3); - Collection<Range<Token>> ranges = new HashSet<>(); - for (SSTableReader sstable : expected) - { - ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken())); - } - - PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, ranges, UUIDGen.getTimeUUID()); - - logger.info("SSTables: {}", sstables); - logger.info("Expected: {}", expected); - PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); - Assert.assertNotNull(result); - logger.info("Originals: {}", result.txn.originals()); - Assert.assertEquals(3, result.txn.originals().size()); - for (SSTableReader sstable : expected) - { - logger.info("Checking {}", sstable); - Assert.assertTrue(result.txn.originals().contains(sstable)); - } - - Assert.assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state()); - result.abort(); - } - - @Test - public void repairedSSTablesAreNotAcquired() throws Exception - { - cfs.disableAutoCompaction(); - makeSSTables(2); - - List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables()); - Assert.assertEquals(2, sstables.size()); - SSTableReader repaired = sstables.get(0); - SSTableReader unrepaired = sstables.get(1); - Assert.assertTrue(repaired.intersects(FULL_RANGE)); - Assert.assertTrue(unrepaired.intersects(FULL_RANGE)); - - repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, 1, null); - repaired.reloadSSTableMetadata(); - - PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID()); - PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); - Assert.assertNotNull(result); - - logger.info("Originals: {}", result.txn.originals()); - Assert.assertEquals(1, result.txn.originals().size()); - Assert.assertTrue(result.txn.originals().contains(unrepaired)); - result.abort(); // release sstable refs - } - - @Test - public void pendingRepairSSTablesAreNotAcquired() throws Exception - { - cfs.disableAutoCompaction(); - makeSSTables(2); - - List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables()); - Assert.assertEquals(2, sstables.size()); - SSTableReader repaired = sstables.get(0); - SSTableReader unrepaired = sstables.get(1); - Assert.assertTrue(repaired.intersects(FULL_RANGE)); - Assert.assertTrue(unrepaired.intersects(FULL_RANGE)); - - repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, 0, UUIDGen.getTimeUUID()); - repaired.reloadSSTableMetadata(); - Assert.assertTrue(repaired.isPendingRepair()); - - PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID()); - PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); - Assert.assertNotNull(result); - - logger.info("Originals: {}", result.txn.originals()); - Assert.assertEquals(1, result.txn.originals().size()); - Assert.assertTrue(result.txn.originals().contains(unrepaired)); - result.abort(); // releases sstable refs - } - - @Test - public void pendingRepairNoSSTablesExist() throws Exception - { - cfs.disableAutoCompaction(); - - Assert.assertEquals(0, cfs.getLiveSSTables().size()); - - PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID()); - PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); - Assert.assertNotNull(result); - - result.abort(); // There's nothing to release, but we should exit cleanly - } - - /** - * anti compaction task should be submitted if everything is ok - */ - @Test - public void callbackSuccess() throws Exception - { - cfs.disableAutoCompaction(); - makeSSTables(2); - - PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID()); - PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); - Assert.assertNotNull(result); - - InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE); - Assert.assertTrue(cb.submittedCompactions.isEmpty()); - cb.apply(Lists.newArrayList(result)); - - Assert.assertEquals(1, cb.submittedCompactions.size()); - Assert.assertTrue(cb.submittedCompactions.contains(cfm.id)); - } - - /** - * If one of the supplied AcquireResults is null, either an Exception was thrown, or - * we couldn't get a transaction for the sstables. In either case we need to cancel the repair, and release - * any sstables acquired for other tables - */ - @Test - public void callbackNullResult() throws Exception - { - cfs.disableAutoCompaction(); - makeSSTables(2); - - PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID()); - PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); - Assert.assertNotNull(result); - Assert.assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state()); - - InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE); - Assert.assertTrue(cb.submittedCompactions.isEmpty()); - cb.apply(Lists.newArrayList(result, null)); - - Assert.assertTrue(cb.submittedCompactions.isEmpty()); - Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, result.txn.state()); - } - - /** - * If an AcquireResult has a null txn, there were no sstables to acquire references - * for, so no anti compaction should have been submitted. - */ - @Test - public void callbackNullTxn() throws Exception - { - cfs.disableAutoCompaction(); - makeSSTables(2); - - PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID()); - PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); - Assert.assertNotNull(result); - - ColumnFamilyStore cfs2 = Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata("system", "peers").id); - PendingAntiCompaction.AcquireResult fakeResult = new PendingAntiCompaction.AcquireResult(cfs2, null, null); - - InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE); - Assert.assertTrue(cb.submittedCompactions.isEmpty()); - cb.apply(Lists.newArrayList(result, fakeResult)); - - Assert.assertEquals(1, cb.submittedCompactions.size()); - Assert.assertTrue(cb.submittedCompactions.contains(cfm.id)); - Assert.assertFalse(cb.submittedCompactions.contains(cfs2.metadata.id)); - } - - - @Test - public void singleAnticompaction() throws Exception - { - cfs.disableAutoCompaction(); - makeSSTables(2); - - PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID()); - PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); - UUID sessionID = UUIDGen.getTimeUUID(); - ActiveRepairService.instance.registerParentRepairSession(sessionID, - InetAddressAndPort.getByName("127.0.0.1"), - Lists.newArrayList(cfs), - FULL_RANGE, - true,0, - true, - PreviewKind.NONE); - CompactionManager.instance.performAnticompaction(result.cfs, FULL_RANGE, result.refs, result.txn, - ActiveRepairService.UNREPAIRED_SSTABLE, sessionID, sessionID); - - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java index c4b0a9c..294731a 100644 --- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java +++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java @@ -264,21 +264,19 @@ public class ActiveRepairServiceTest ColumnFamilyStore store = prepareColumnFamilyStore(); UUID prsId = UUID.randomUUID(); Set<SSTableReader> original = Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> !s.isRepaired())).sstables); + Collection<Range<Token>> ranges = Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())); ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddressAndPort(), Collections.singletonList(store), - Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), - store.getPartitioner().getMinimumToken())), - true, System.currentTimeMillis(), true, PreviewKind.NONE); - ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.id, prsId); + ranges, true, System.currentTimeMillis(), true, PreviewKind.NONE); + store.getRepairManager().snapshot(prsId.toString(), ranges, false); UUID prsId2 = UUID.randomUUID(); ActiveRepairService.instance.registerParentRepairSession(prsId2, FBUtilities.getBroadcastAddressAndPort(), Collections.singletonList(store), - Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), - store.getPartitioner().getMinimumToken())), + ranges, true, System.currentTimeMillis(), true, PreviewKind.NONE); createSSTables(store, 2); - ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.id, prsId); + store.getRepairManager().snapshot(prsId.toString(), ranges, false); try (Refs<SSTableReader> refs = store.getSnapshotSSTableReaders(prsId.toString())) { assertEquals(original, Sets.newHashSet(refs.iterator())); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org