Repository: cassandra
Updated Branches:
  refs/heads/trunk 478c1a9fd -> c5a7fcaa8


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 950966f..b60088c 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import com.google.common.base.Predicate;
+import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.ImmutableSet;
@@ -47,7 +47,7 @@ import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.RequestFailureReason;
@@ -59,7 +59,6 @@ import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
 import org.apache.cassandra.gms.IFailureDetectionEventListener;
 import org.apache.cassandra.gms.VersionedValue;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
@@ -549,6 +548,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
      */
     public static class ParentRepairSession
     {
+        private final Keyspace keyspace;
         private final Map<TableId, ColumnFamilyStore> columnFamilyStores = new 
HashMap<>();
         private final Collection<Range<Token>> ranges;
         public final boolean isIncremental;
@@ -560,10 +560,16 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
         public ParentRepairSession(InetAddressAndPort coordinator, 
List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, 
boolean isIncremental, long repairedAt, boolean isGlobal, PreviewKind 
previewKind)
         {
             this.coordinator = coordinator;
+            Set<Keyspace> keyspaces = new HashSet<>();
             for (ColumnFamilyStore cfs : columnFamilyStores)
             {
+                keyspaces.add(cfs.keyspace);
                 this.columnFamilyStores.put(cfs.metadata.id, cfs);
             }
+
+            Preconditions.checkArgument(keyspaces.size() == 1, "repair 
sessions cannot operate on multiple keyspaces");
+            this.keyspace = Iterables.getOnlyElement(keyspaces);
+
             this.ranges = ranges;
             this.repairedAt = repairedAt;
             this.isIncremental = isIncremental;
@@ -576,42 +582,14 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
             return previewKind != PreviewKind.NONE;
         }
 
-        public Predicate<SSTableReader> getPreviewPredicate()
-        {
-            switch (previewKind)
-            {
-                case ALL:
-                    return (s) -> true;
-                case REPAIRED:
-                    return (s) -> s.isRepaired();
-                case UNREPAIRED:
-                    return (s) -> !s.isRepaired();
-                default:
-                    throw new RuntimeException("Can't get preview predicate 
for preview kind " + previewKind);
-            }
-        }
-
-        public synchronized void maybeSnapshot(TableId tableId, UUID 
parentSessionId)
+        public Collection<ColumnFamilyStore> getColumnFamilyStores()
         {
-            String snapshotName = parentSessionId.toString();
-            if (!columnFamilyStores.get(tableId).snapshotExists(snapshotName))
-            {
-                Set<SSTableReader> snapshottedSSTables = 
columnFamilyStores.get(tableId).snapshot(snapshotName, new 
Predicate<SSTableReader>()
-                {
-                    public boolean apply(SSTableReader sstable)
-                    {
-                        return sstable != null &&
-                               (!isIncremental || !sstable.isRepaired()) &&
-                               !(sstable.metadata().isIndex()) && // exclude 
SSTables from 2i
-                               new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(ranges);
-                    }
-                }, true, false);
-            }
+            return 
ImmutableSet.<ColumnFamilyStore>builder().addAll(columnFamilyStores.values()).build();
         }
 
-        public Collection<ColumnFamilyStore> getColumnFamilyStores()
+        public Keyspace getKeyspace()
         {
-            return 
ImmutableSet.<ColumnFamilyStore>builder().addAll(columnFamilyStores.values()).build();
+            return keyspace;
         }
 
         public Set<TableId> getTableIds()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
deleted file mode 100644
index 8290adf..0000000
--- 
a/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db.compaction;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.UUID;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.cql3.statements.CreateTableStatement;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.repair.RepairJobDesc;
-import org.apache.cassandra.repair.Validator;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
-
-/**
- * Tests correct sstables are returned from 
CompactionManager.getSSTablesForValidation
- * for consistent, legacy incremental, and full repairs
- */
-public class CompactionManagerGetSSTablesForValidationTest
-{
-    private String ks;
-    private static final String tbl = "tbl";
-    private ColumnFamilyStore cfs;
-    private static InetAddressAndPort coordinator;
-
-    private static Token MT;
-
-    private SSTableReader repaired;
-    private SSTableReader unrepaired;
-    private SSTableReader pendingRepair;
-
-    private UUID sessionID;
-    private RepairJobDesc desc;
-
-    @BeforeClass
-    public static void setupClass() throws Exception
-    {
-        SchemaLoader.prepareServer();
-        coordinator = InetAddressAndPort.getByName("10.0.0.1");
-        MT = DatabaseDescriptor.getPartitioner().getMinimumToken();
-    }
-
-    @Before
-    public void setup() throws Exception
-    {
-        ks = "ks_" + System.currentTimeMillis();
-        TableMetadata cfm = CreateTableStatement.parse(String.format("CREATE 
TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build();
-        SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm);
-        cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
-    }
-
-    private void makeSSTables()
-    {
-        for (int i=0; i<3; i++)
-        {
-            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s 
(k, v) VALUES(?, ?)", ks, tbl), i, i);
-            cfs.forceBlockingFlush();
-        }
-        Assert.assertEquals(3, cfs.getLiveSSTables().size());
-
-    }
-
-    private void registerRepair(boolean incremental) throws Exception
-    {
-        sessionID = UUIDGen.getTimeUUID();
-        Range<Token> range = new Range<>(MT, MT);
-        ActiveRepairService.instance.registerParentRepairSession(sessionID,
-                                                                 coordinator,
-                                                                 
Lists.newArrayList(cfs),
-                                                                 
Sets.newHashSet(range),
-                                                                 incremental,
-                                                                 incremental ? 
System.currentTimeMillis() : ActiveRepairService.UNREPAIRED_SSTABLE,
-                                                                 true,
-                                                                 
PreviewKind.NONE);
-        desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, 
Collections.singleton(range));
-    }
-
-    private void modifySSTables() throws Exception
-    {
-        Iterator<SSTableReader> iter = cfs.getLiveSSTables().iterator();
-
-        repaired = iter.next();
-        
repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, 
System.currentTimeMillis(), null);
-        repaired.reloadSSTableMetadata();
-
-        pendingRepair = iter.next();
-        
pendingRepair.descriptor.getMetadataSerializer().mutateRepaired(pendingRepair.descriptor,
 ActiveRepairService.UNREPAIRED_SSTABLE, sessionID);
-        pendingRepair.reloadSSTableMetadata();
-
-        unrepaired = iter.next();
-
-        Assert.assertFalse(iter.hasNext());
-    }
-
-    @Test
-    public void consistentRepair() throws Exception
-    {
-        makeSSTables();
-        registerRepair(true);
-        modifySSTables();
-
-        // get sstables for repair
-        Validator validator = new Validator(desc, coordinator, 
FBUtilities.nowInSeconds(), true, PreviewKind.NONE);
-        Set<SSTableReader> sstables = 
Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, 
validator));
-        Assert.assertNotNull(sstables);
-        Assert.assertEquals(1, sstables.size());
-        Assert.assertTrue(sstables.contains(pendingRepair));
-    }
-
-    @Test
-    public void legacyIncrementalRepair() throws Exception
-    {
-        makeSSTables();
-        registerRepair(true);
-        modifySSTables();
-
-        // get sstables for repair
-        Validator validator = new Validator(desc, coordinator, 
FBUtilities.nowInSeconds(), false, PreviewKind.NONE);
-        Set<SSTableReader> sstables = 
Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, 
validator));
-        Assert.assertNotNull(sstables);
-        Assert.assertEquals(2, sstables.size());
-        Assert.assertTrue(sstables.contains(pendingRepair));
-        Assert.assertTrue(sstables.contains(unrepaired));
-    }
-
-    @Test
-    public void fullRepair() throws Exception
-    {
-        makeSSTables();
-        registerRepair(false);
-        modifySSTables();
-
-        // get sstables for repair
-        Validator validator = new Validator(desc, coordinator, 
FBUtilities.nowInSeconds(), false, PreviewKind.NONE);
-        Set<SSTableReader> sstables = 
Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, 
validator));
-        Assert.assertNotNull(sstables);
-        Assert.assertEquals(3, sstables.size());
-        Assert.assertTrue(sstables.contains(pendingRepair));
-        Assert.assertTrue(sstables.contains(unrepaired));
-        Assert.assertTrue(sstables.contains(repaired));
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 567984d..f7e9b90 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -51,6 +51,7 @@ import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.notifications.SSTableAddedNotification;
 import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
+import org.apache.cassandra.repair.ValidationManager;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.repair.Validator;
@@ -204,7 +205,8 @@ public class LeveledCompactionStrategyTest
                                                                  
PreviewKind.NONE);
         RepairJobDesc desc = new RepairJobDesc(parentRepSession, 
UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, Arrays.asList(range));
         Validator validator = new Validator(desc, 
FBUtilities.getBroadcastAddressAndPort(), gcBefore, PreviewKind.NONE);
-        CompactionManager.instance.submitValidation(cfs, validator).get();
+
+        ValidationManager.instance.submitValidation(cfs, validator).get();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java
 
b/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java
new file mode 100644
index 0000000..365ad7e
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.repair;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.repair.Validator;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static 
org.apache.cassandra.db.repair.CassandraValidationIterator.getSSTablesToValidate;
+
+/**
+ * Tests correct sstables are returned from 
CompactionManager.getSSTablesForValidation
+ * for consistent, legacy incremental, and full repairs
+ */
+public class CompactionManagerGetSSTablesForValidationTest
+{
+    private String ks;
+    private static final String tbl = "tbl";
+    private ColumnFamilyStore cfs;
+    private static InetAddressAndPort coordinator;
+
+    private static Token MT;
+
+    private SSTableReader repaired;
+    private SSTableReader unrepaired;
+    private SSTableReader pendingRepair;
+
+    private UUID sessionID;
+    private RepairJobDesc desc;
+
+    @BeforeClass
+    public static void setupClass() throws Exception
+    {
+        SchemaLoader.prepareServer();
+        coordinator = InetAddressAndPort.getByName("10.0.0.1");
+        MT = DatabaseDescriptor.getPartitioner().getMinimumToken();
+    }
+
+    @Before
+    public void setup() throws Exception
+    {
+        ks = "ks_" + System.currentTimeMillis();
+        TableMetadata cfm = CreateTableStatement.parse(String.format("CREATE 
TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build();
+        SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm);
+        cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
+    }
+
+    private void makeSSTables()
+    {
+        for (int i=0; i<3; i++)
+        {
+            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s 
(k, v) VALUES(?, ?)", ks, tbl), i, i);
+            cfs.forceBlockingFlush();
+        }
+        Assert.assertEquals(3, cfs.getLiveSSTables().size());
+
+    }
+
+    private void registerRepair(boolean incremental) throws Exception
+    {
+        sessionID = UUIDGen.getTimeUUID();
+        Range<Token> range = new Range<>(MT, MT);
+        ActiveRepairService.instance.registerParentRepairSession(sessionID,
+                                                                 coordinator,
+                                                                 
Lists.newArrayList(cfs),
+                                                                 
Sets.newHashSet(range),
+                                                                 incremental,
+                                                                 incremental ? 
System.currentTimeMillis() : ActiveRepairService.UNREPAIRED_SSTABLE,
+                                                                 true,
+                                                                 
PreviewKind.NONE);
+        desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, 
Collections.singleton(range));
+    }
+
+    private void modifySSTables() throws Exception
+    {
+        Iterator<SSTableReader> iter = cfs.getLiveSSTables().iterator();
+
+        repaired = iter.next();
+        
repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, 
System.currentTimeMillis(), null);
+        repaired.reloadSSTableMetadata();
+
+        pendingRepair = iter.next();
+        
pendingRepair.descriptor.getMetadataSerializer().mutateRepaired(pendingRepair.descriptor,
 ActiveRepairService.UNREPAIRED_SSTABLE, sessionID);
+        pendingRepair.reloadSSTableMetadata();
+
+        unrepaired = iter.next();
+
+        Assert.assertFalse(iter.hasNext());
+    }
+
+    @Test
+    public void consistentRepair() throws Exception
+    {
+        makeSSTables();
+        registerRepair(true);
+        modifySSTables();
+
+        // get sstables for repair
+        Validator validator = new Validator(desc, coordinator, 
FBUtilities.nowInSeconds(), true, PreviewKind.NONE);
+        Set<SSTableReader> sstables = 
Sets.newHashSet(getSSTablesToValidate(cfs, validator.desc.ranges, 
validator.desc.parentSessionId, validator.isIncremental));
+        Assert.assertNotNull(sstables);
+        Assert.assertEquals(1, sstables.size());
+        Assert.assertTrue(sstables.contains(pendingRepair));
+    }
+
+    @Test
+    public void legacyIncrementalRepair() throws Exception
+    {
+        makeSSTables();
+        registerRepair(true);
+        modifySSTables();
+
+        // get sstables for repair
+        Validator validator = new Validator(desc, coordinator, 
FBUtilities.nowInSeconds(), false, PreviewKind.NONE);
+        Set<SSTableReader> sstables = 
Sets.newHashSet(getSSTablesToValidate(cfs, validator.desc.ranges, 
validator.desc.parentSessionId, validator.isIncremental));
+        Assert.assertNotNull(sstables);
+        Assert.assertEquals(2, sstables.size());
+        Assert.assertTrue(sstables.contains(pendingRepair));
+        Assert.assertTrue(sstables.contains(unrepaired));
+    }
+
+    @Test
+    public void fullRepair() throws Exception
+    {
+        makeSSTables();
+        registerRepair(false);
+        modifySSTables();
+
+        // get sstables for repair
+        Validator validator = new Validator(desc, coordinator, 
FBUtilities.nowInSeconds(), false, PreviewKind.NONE);
+        Set<SSTableReader> sstables = 
Sets.newHashSet(getSSTablesToValidate(cfs, validator.desc.ranges, 
validator.desc.parentSessionId, validator.isIncremental));
+        Assert.assertNotNull(sstables);
+        Assert.assertEquals(3, sstables.size());
+        Assert.assertTrue(sstables.contains(pendingRepair));
+        Assert.assertTrue(sstables.contains(unrepaired));
+        Assert.assertTrue(sstables.contains(repaired));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java 
b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
new file mode 100644
index 0000000..269a725
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.repair;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+public class PendingAntiCompactionTest
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(PendingAntiCompactionTest.class);
+    private static final Collection<Range<Token>> FULL_RANGE;
+    static
+    {
+        DatabaseDescriptor.daemonInitialization();
+        Token minToken = DatabaseDescriptor.getPartitioner().getMinimumToken();
+        FULL_RANGE = Collections.singleton(new Range<>(minToken, minToken));
+    }
+
+    private String ks;
+    private final String tbl = "tbl";
+    private TableMetadata cfm;
+    private ColumnFamilyStore cfs;
+
+    @BeforeClass
+    public static void setupClass()
+    {
+        SchemaLoader.prepareServer();
+    }
+
+    @Before
+    public void setup()
+    {
+        ks = "ks_" + System.currentTimeMillis();
+        cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k 
INT PRIMARY KEY, v INT)", ks, tbl), ks).build();
+        SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm);
+        cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
+    }
+
+    private void makeSSTables(int num)
+    {
+        for (int i = 0; i < num; i++)
+        {
+            int val = i * 2;  // multiplied to prevent ranges from overlapping
+            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s 
(k, v) VALUES (?, ?)", ks, tbl), val, val);
+            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s 
(k, v) VALUES (?, ?)", ks, tbl), val+1, val+1);
+            cfs.forceBlockingFlush();
+        }
+        Assert.assertEquals(num, cfs.getLiveSSTables().size());
+    }
+
+    private static class InstrumentedAcquisitionCallback extends 
PendingAntiCompaction.AcquisitionCallback
+    {
+        public InstrumentedAcquisitionCallback(UUID parentRepairSession, 
Collection<Range<Token>> ranges)
+        {
+            super(parentRepairSession, ranges);
+        }
+
+        Set<TableId> submittedCompactions = new HashSet<>();
+
+        ListenableFuture<?> 
submitPendingAntiCompaction(PendingAntiCompaction.AcquireResult result)
+        {
+            submittedCompactions.add(result.cfs.metadata.id);
+            result.abort();  // prevent ref leak complaints
+            return ListenableFutureTask.create(() -> {}, null);
+        }
+    }
+
+    /**
+     * verify the pending anti compaction happy path
+     */
+    @Test
+    public void successCase() throws Exception
+    {
+        Assert.assertSame(ByteOrderedPartitioner.class, 
DatabaseDescriptor.getPartitioner().getClass());
+        cfs.disableAutoCompaction();
+
+        // create 2 sstables, one that will be split, and another that will be 
moved
+        for (int i = 0; i < 8; i++)
+        {
+            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s 
(k, v) VALUES (?, ?)", ks, tbl), i, i);
+        }
+        cfs.forceBlockingFlush();
+        for (int i = 8; i < 12; i++)
+        {
+            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s 
(k, v) VALUES (?, ?)", ks, tbl), i, i);
+        }
+        cfs.forceBlockingFlush();
+        Assert.assertEquals(2, cfs.getLiveSSTables().size());
+
+        Token left = 
ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes((int) 6));
+        Token right = 
ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes((int) 16));
+        List<ColumnFamilyStore> tables = Lists.newArrayList(cfs);
+        Collection<Range<Token>> ranges = Collections.singleton(new 
Range<>(left, right));
+
+        // create a session so the anti compaction can fine it
+        UUID sessionID = UUIDGen.getTimeUUID();
+        ActiveRepairService.instance.registerParentRepairSession(sessionID, 
InetAddressAndPort.getLocalHost(), tables, ranges, true, 1, true, 
PreviewKind.NONE);
+
+        PendingAntiCompaction pac;
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        try
+        {
+            pac = new PendingAntiCompaction(sessionID, tables, ranges, 
executor);
+            pac.run().get();
+        }
+        finally
+        {
+            executor.shutdown();
+        }
+
+        Assert.assertEquals(3, cfs.getLiveSSTables().size());
+        int pendingRepair = 0;
+        for (SSTableReader sstable : cfs.getLiveSSTables())
+        {
+            if (sstable.isPendingRepair())
+                pendingRepair++;
+        }
+        Assert.assertEquals(2, pendingRepair);
+    }
+
+    @Test
+    public void acquisitionSuccess() throws Exception
+    {
+        cfs.disableAutoCompaction();
+        makeSSTables(6);
+        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
+        List<SSTableReader> expected = sstables.subList(0, 3);
+        Collection<Range<Token>> ranges = new HashSet<>();
+        for (SSTableReader sstable : expected)
+        {
+            ranges.add(new Range<>(sstable.first.getToken(), 
sstable.last.getToken()));
+        }
+
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, ranges, UUIDGen.getTimeUUID());
+
+        logger.info("SSTables: {}", sstables);
+        logger.info("Expected: {}", expected);
+        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
+        Assert.assertNotNull(result);
+        logger.info("Originals: {}", result.txn.originals());
+        Assert.assertEquals(3, result.txn.originals().size());
+        for (SSTableReader sstable : expected)
+        {
+            logger.info("Checking {}", sstable);
+            Assert.assertTrue(result.txn.originals().contains(sstable));
+        }
+
+        
Assert.assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, 
result.txn.state());
+        result.abort();
+    }
+
+    @Test
+    public void repairedSSTablesAreNotAcquired() throws Exception
+    {
+        cfs.disableAutoCompaction();
+        makeSSTables(2);
+
+        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
+        Assert.assertEquals(2, sstables.size());
+        SSTableReader repaired = sstables.get(0);
+        SSTableReader unrepaired = sstables.get(1);
+        Assert.assertTrue(repaired.intersects(FULL_RANGE));
+        Assert.assertTrue(unrepaired.intersects(FULL_RANGE));
+
+        
repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, 
1, null);
+        repaired.reloadSSTableMetadata();
+
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID());
+        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
+        Assert.assertNotNull(result);
+
+        logger.info("Originals: {}", result.txn.originals());
+        Assert.assertEquals(1, result.txn.originals().size());
+        Assert.assertTrue(result.txn.originals().contains(unrepaired));
+        result.abort(); // release sstable refs
+    }
+
+    @Test
+    public void pendingRepairSSTablesAreNotAcquired() throws Exception
+    {
+        cfs.disableAutoCompaction();
+        makeSSTables(2);
+
+        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
+        Assert.assertEquals(2, sstables.size());
+        SSTableReader repaired = sstables.get(0);
+        SSTableReader unrepaired = sstables.get(1);
+        Assert.assertTrue(repaired.intersects(FULL_RANGE));
+        Assert.assertTrue(unrepaired.intersects(FULL_RANGE));
+
+        
repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, 
0, UUIDGen.getTimeUUID());
+        repaired.reloadSSTableMetadata();
+        Assert.assertTrue(repaired.isPendingRepair());
+
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID());
+        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
+        Assert.assertNotNull(result);
+
+        logger.info("Originals: {}", result.txn.originals());
+        Assert.assertEquals(1, result.txn.originals().size());
+        Assert.assertTrue(result.txn.originals().contains(unrepaired));
+        result.abort();  // releases sstable refs
+    }
+
+    @Test
+    public void pendingRepairNoSSTablesExist() throws Exception
+    {
+        cfs.disableAutoCompaction();
+
+        Assert.assertEquals(0, cfs.getLiveSSTables().size());
+
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID());
+        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
+        Assert.assertNotNull(result);
+
+        result.abort();  // There's nothing to release, but we should exit 
cleanly
+    }
+
+    /**
+     * anti compaction task should be submitted if everything is ok
+     */
+    @Test
+    public void callbackSuccess() throws Exception
+    {
+        cfs.disableAutoCompaction();
+        makeSSTables(2);
+
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID());
+        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
+        Assert.assertNotNull(result);
+
+        InstrumentedAcquisitionCallback cb = new 
InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE);
+        Assert.assertTrue(cb.submittedCompactions.isEmpty());
+        cb.apply(Lists.newArrayList(result));
+
+        Assert.assertEquals(1, cb.submittedCompactions.size());
+        Assert.assertTrue(cb.submittedCompactions.contains(cfm.id));
+    }
+
+    /**
+     * If one of the supplied AcquireResults is null, either an Exception was 
thrown, or
+     * we couldn't get a transaction for the sstables. In either case we need 
to cancel the repair, and release
+     * any sstables acquired for other tables
+     */
+    @Test
+    public void callbackNullResult() throws Exception
+    {
+        cfs.disableAutoCompaction();
+        makeSSTables(2);
+
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID());
+        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
+        Assert.assertNotNull(result);
+        
Assert.assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, 
result.txn.state());
+
+        InstrumentedAcquisitionCallback cb = new 
InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE);
+        Assert.assertTrue(cb.submittedCompactions.isEmpty());
+        cb.apply(Lists.newArrayList(result, null));
+
+        Assert.assertTrue(cb.submittedCompactions.isEmpty());
+        Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, 
result.txn.state());
+    }
+
+    /**
+     * If an AcquireResult has a null txn, there were no sstables to acquire 
references
+     * for, so no anti compaction should have been submitted.
+     */
+    @Test
+    public void callbackNullTxn() throws Exception
+    {
+        cfs.disableAutoCompaction();
+        makeSSTables(2);
+
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID());
+        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
+        Assert.assertNotNull(result);
+
+        ColumnFamilyStore cfs2 = 
Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata("system",
 "peers").id);
+        PendingAntiCompaction.AcquireResult fakeResult = new 
PendingAntiCompaction.AcquireResult(cfs2, null, null);
+
+        InstrumentedAcquisitionCallback cb = new 
InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE);
+        Assert.assertTrue(cb.submittedCompactions.isEmpty());
+        cb.apply(Lists.newArrayList(result, fakeResult));
+
+        Assert.assertEquals(1, cb.submittedCompactions.size());
+        Assert.assertTrue(cb.submittedCompactions.contains(cfm.id));
+        Assert.assertFalse(cb.submittedCompactions.contains(cfs2.metadata.id));
+    }
+
+
+    @Test
+    public void singleAnticompaction() throws Exception
+    {
+        cfs.disableAutoCompaction();
+        makeSSTables(2);
+
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID());
+        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
+        UUID sessionID = UUIDGen.getTimeUUID();
+        ActiveRepairService.instance.registerParentRepairSession(sessionID,
+                                                                 
InetAddressAndPort.getByName("127.0.0.1"),
+                                                                 
Lists.newArrayList(cfs),
+                                                                 FULL_RANGE,
+                                                                 true,0,
+                                                                 true,
+                                                                 
PreviewKind.NONE);
+        CompactionManager.instance.performAnticompaction(result.cfs, 
FULL_RANGE, result.refs, result.txn,
+                                                         
ActiveRepairService.UNREPAIRED_SSTABLE, sessionID, sessionID);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java 
b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index 7c3dd27..3b582a9 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -199,7 +199,7 @@ public class ValidatorTest
 
         final CompletableFuture<MessageOut> outgoingMessageSink = 
registerOutgoingMessageSink();
         Validator validator = new Validator(desc, 
FBUtilities.getBroadcastAddressAndPort(), 0, true, false, PreviewKind.NONE);
-        CompactionManager.instance.submitValidation(cfs, validator);
+        ValidationManager.instance.submitValidation(cfs, validator);
 
         MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, 
TimeUnit.SECONDS);
         assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java 
b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
index 5fa43a9..df51444 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.repair.consistent;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -40,8 +41,11 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.repair.AbstractRepairTest;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.repair.KeyspaceRepairManager;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
@@ -128,18 +132,20 @@ public class LocalSessionTest extends AbstractRepairTest
             sentMessages.get(destination).add(message);
         }
 
-        SettableFuture<Object> pendingAntiCompactionFuture = null;
-        boolean submitPendingAntiCompactionCalled = false;
-        ListenableFuture submitPendingAntiCompaction(LocalSession session, 
ExecutorService executor)
+        SettableFuture<Object> prepareSessionFuture = null;
+        boolean prepareSessionCalled = false;
+
+        @Override
+        ListenableFuture prepareSession(KeyspaceRepairManager repairManager, 
UUID sessionID, Collection<ColumnFamilyStore> tables, Collection<Range<Token>> 
ranges, ExecutorService executor)
         {
-            submitPendingAntiCompactionCalled = true;
-            if (pendingAntiCompactionFuture != null)
+            prepareSessionCalled = true;
+            if (prepareSessionFuture != null)
             {
-                return pendingAntiCompactionFuture;
+                return prepareSessionFuture;
             }
             else
             {
-                return super.submitPendingAntiCompaction(session, executor);
+                return super.prepareSession(repairManager, sessionID, tables, 
ranges, executor);
             }
         }
 
@@ -152,9 +158,9 @@ public class LocalSessionTest extends AbstractRepairTest
 
         public LocalSession prepareForTest(UUID sessionID)
         {
-            pendingAntiCompactionFuture = SettableFuture.create();
+            prepareSessionFuture = SettableFuture.create();
             handlePrepareMessage(PARTICIPANT1, new 
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
-            pendingAntiCompactionFuture.set(new Object());
+            prepareSessionFuture.set(new Object());
             sentMessages.clear();
             return getSession(sessionID);
         }
@@ -254,10 +260,10 @@ public class LocalSessionTest extends AbstractRepairTest
         sessions.start();
 
         // replacing future so we can inspect state before and after anti 
compaction callback
-        sessions.pendingAntiCompactionFuture = SettableFuture.create();
-        Assert.assertFalse(sessions.submitPendingAntiCompactionCalled);
+        sessions.prepareSessionFuture = SettableFuture.create();
+        Assert.assertFalse(sessions.prepareSessionCalled);
         sessions.handlePrepareMessage(PARTICIPANT1, new 
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
-        Assert.assertTrue(sessions.submitPendingAntiCompactionCalled);
+        Assert.assertTrue(sessions.prepareSessionCalled);
         Assert.assertTrue(sessions.sentMessages.isEmpty());
 
         // anti compaction hasn't finished yet, so state in memory and on disk 
should be PREPARING
@@ -267,7 +273,7 @@ public class LocalSessionTest extends AbstractRepairTest
         Assert.assertEquals(session, sessions.loadUnsafe(sessionID));
 
         // anti compaction has now finished, so state in memory and on disk 
should be PREPARED
-        sessions.pendingAntiCompactionFuture.set(new Object());
+        sessions.prepareSessionFuture.set(new Object());
         session = sessions.getSession(sessionID);
         Assert.assertNotNull(session);
         Assert.assertEquals(PREPARED, session.getState());
@@ -289,10 +295,10 @@ public class LocalSessionTest extends AbstractRepairTest
         sessions.start();
 
         // replacing future so we can inspect state before and after anti 
compaction callback
-        sessions.pendingAntiCompactionFuture = SettableFuture.create();
-        Assert.assertFalse(sessions.submitPendingAntiCompactionCalled);
+        sessions.prepareSessionFuture = SettableFuture.create();
+        Assert.assertFalse(sessions.prepareSessionCalled);
         sessions.handlePrepareMessage(PARTICIPANT1, new 
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
-        Assert.assertTrue(sessions.submitPendingAntiCompactionCalled);
+        Assert.assertTrue(sessions.prepareSessionCalled);
         Assert.assertTrue(sessions.sentMessages.isEmpty());
 
         // anti compaction hasn't finished yet, so state in memory and on disk 
should be PREPARING
@@ -302,7 +308,7 @@ public class LocalSessionTest extends AbstractRepairTest
         Assert.assertEquals(session, sessions.loadUnsafe(sessionID));
 
         // anti compaction has now finished, so state in memory and on disk 
should be PREPARED
-        sessions.pendingAntiCompactionFuture.setException(new 
RuntimeException());
+        sessions.prepareSessionFuture.setException(new RuntimeException());
         session = sessions.getSession(sessionID);
         Assert.assertNotNull(session);
         Assert.assertEquals(FAILED, session.getState());
@@ -657,7 +663,7 @@ public class LocalSessionTest extends AbstractRepairTest
         UUID sessionID = registerSession();
         InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
         sessions.start();
-        sessions.pendingAntiCompactionFuture = SettableFuture.create();  // 
prevent moving to prepared
+        sessions.prepareSessionFuture = SettableFuture.create();  // prevent 
moving to prepared
         sessions.handlePrepareMessage(PARTICIPANT1, new 
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
 
         LocalSession session = sessions.getSession(sessionID);
@@ -684,9 +690,9 @@ public class LocalSessionTest extends AbstractRepairTest
         UUID sessionID = registerSession();
         InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
         sessions.start();
-        sessions.pendingAntiCompactionFuture = SettableFuture.create();
+        sessions.prepareSessionFuture = SettableFuture.create();
         sessions.handlePrepareMessage(PARTICIPANT1, new 
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
-        sessions.pendingAntiCompactionFuture.set(new Object());
+        sessions.prepareSessionFuture.set(new Object());
 
         Assert.assertTrue(sessions.isSessionInProgress(sessionID));
         sessions.failSession(sessionID);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
 
b/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
deleted file mode 100644
index 213cdd3..0000000
--- 
a/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.repair.consistent;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.statements.CreateTableStatement;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.dht.ByteOrderedPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.UUIDGen;
-import org.apache.cassandra.utils.concurrent.Transactional;
-
-public class PendingAntiCompactionTest
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(PendingAntiCompactionTest.class);
-    private static final Collection<Range<Token>> FULL_RANGE;
-    static
-    {
-        DatabaseDescriptor.daemonInitialization();
-        Token minToken = DatabaseDescriptor.getPartitioner().getMinimumToken();
-        FULL_RANGE = Collections.singleton(new Range<>(minToken, minToken));
-    }
-
-    private String ks;
-    private final String tbl = "tbl";
-    private TableMetadata cfm;
-    private ColumnFamilyStore cfs;
-
-    @BeforeClass
-    public static void setupClass()
-    {
-        SchemaLoader.prepareServer();
-    }
-
-    @Before
-    public void setup()
-    {
-        ks = "ks_" + System.currentTimeMillis();
-        cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k 
INT PRIMARY KEY, v INT)", ks, tbl), ks).build();
-        SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm);
-        cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
-    }
-
-    private void makeSSTables(int num)
-    {
-        for (int i = 0; i < num; i++)
-        {
-            int val = i * 2;  // multiplied to prevent ranges from overlapping
-            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s 
(k, v) VALUES (?, ?)", ks, tbl), val, val);
-            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s 
(k, v) VALUES (?, ?)", ks, tbl), val+1, val+1);
-            cfs.forceBlockingFlush();
-        }
-        Assert.assertEquals(num, cfs.getLiveSSTables().size());
-    }
-
-    private static class InstrumentedAcquisitionCallback extends 
PendingAntiCompaction.AcquisitionCallback
-    {
-        public InstrumentedAcquisitionCallback(UUID parentRepairSession, 
Collection<Range<Token>> ranges)
-        {
-            super(parentRepairSession, ranges);
-        }
-
-        Set<TableId> submittedCompactions = new HashSet<>();
-
-        ListenableFuture<?> 
submitPendingAntiCompaction(PendingAntiCompaction.AcquireResult result)
-        {
-            submittedCompactions.add(result.cfs.metadata.id);
-            result.abort();  // prevent ref leak complaints
-            return ListenableFutureTask.create(() -> {}, null);
-        }
-    }
-
-    /**
-     * verify the pending anti compaction happy path
-     */
-    @Test
-    public void successCase() throws Exception
-    {
-        Assert.assertSame(ByteOrderedPartitioner.class, 
DatabaseDescriptor.getPartitioner().getClass());
-        cfs.disableAutoCompaction();
-
-        // create 2 sstables, one that will be split, and another that will be 
moved
-        for (int i = 0; i < 8; i++)
-        {
-            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s 
(k, v) VALUES (?, ?)", ks, tbl), i, i);
-        }
-        cfs.forceBlockingFlush();
-        for (int i = 8; i < 12; i++)
-        {
-            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s 
(k, v) VALUES (?, ?)", ks, tbl), i, i);
-        }
-        cfs.forceBlockingFlush();
-        Assert.assertEquals(2, cfs.getLiveSSTables().size());
-
-        Token left = 
ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes((int) 6));
-        Token right = 
ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes((int) 16));
-        Collection<Range<Token>> ranges = Collections.singleton(new 
Range<>(left, right));
-
-        // create a session so the anti compaction can fine it
-        UUID sessionID = UUIDGen.getTimeUUID();
-        ActiveRepairService.instance.registerParentRepairSession(sessionID, 
InetAddressAndPort.getLocalHost(), Lists.newArrayList(cfs), ranges, true, 1, 
true, PreviewKind.NONE);
-
-        PendingAntiCompaction pac;
-        ExecutorService executor = Executors.newSingleThreadExecutor();
-        try
-        {
-            pac = new PendingAntiCompaction(sessionID, ranges, executor);
-            pac.run().get();
-        }
-        finally
-        {
-            executor.shutdown();
-        }
-
-        Assert.assertEquals(3, cfs.getLiveSSTables().size());
-        int pendingRepair = 0;
-        for (SSTableReader sstable : cfs.getLiveSSTables())
-        {
-            if (sstable.isPendingRepair())
-                pendingRepair++;
-        }
-        Assert.assertEquals(2, pendingRepair);
-    }
-
-    @Test
-    public void acquisitionSuccess() throws Exception
-    {
-        cfs.disableAutoCompaction();
-        makeSSTables(6);
-        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
-        List<SSTableReader> expected = sstables.subList(0, 3);
-        Collection<Range<Token>> ranges = new HashSet<>();
-        for (SSTableReader sstable : expected)
-        {
-            ranges.add(new Range<>(sstable.first.getToken(), 
sstable.last.getToken()));
-        }
-
-        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, ranges, UUIDGen.getTimeUUID());
-
-        logger.info("SSTables: {}", sstables);
-        logger.info("Expected: {}", expected);
-        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
-        Assert.assertNotNull(result);
-        logger.info("Originals: {}", result.txn.originals());
-        Assert.assertEquals(3, result.txn.originals().size());
-        for (SSTableReader sstable : expected)
-        {
-            logger.info("Checking {}", sstable);
-            Assert.assertTrue(result.txn.originals().contains(sstable));
-        }
-
-        
Assert.assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, 
result.txn.state());
-        result.abort();
-    }
-
-    @Test
-    public void repairedSSTablesAreNotAcquired() throws Exception
-    {
-        cfs.disableAutoCompaction();
-        makeSSTables(2);
-
-        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
-        Assert.assertEquals(2, sstables.size());
-        SSTableReader repaired = sstables.get(0);
-        SSTableReader unrepaired = sstables.get(1);
-        Assert.assertTrue(repaired.intersects(FULL_RANGE));
-        Assert.assertTrue(unrepaired.intersects(FULL_RANGE));
-
-        
repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, 
1, null);
-        repaired.reloadSSTableMetadata();
-
-        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID());
-        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
-        Assert.assertNotNull(result);
-
-        logger.info("Originals: {}", result.txn.originals());
-        Assert.assertEquals(1, result.txn.originals().size());
-        Assert.assertTrue(result.txn.originals().contains(unrepaired));
-        result.abort(); // release sstable refs
-    }
-
-    @Test
-    public void pendingRepairSSTablesAreNotAcquired() throws Exception
-    {
-        cfs.disableAutoCompaction();
-        makeSSTables(2);
-
-        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
-        Assert.assertEquals(2, sstables.size());
-        SSTableReader repaired = sstables.get(0);
-        SSTableReader unrepaired = sstables.get(1);
-        Assert.assertTrue(repaired.intersects(FULL_RANGE));
-        Assert.assertTrue(unrepaired.intersects(FULL_RANGE));
-
-        
repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, 
0, UUIDGen.getTimeUUID());
-        repaired.reloadSSTableMetadata();
-        Assert.assertTrue(repaired.isPendingRepair());
-
-        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID());
-        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
-        Assert.assertNotNull(result);
-
-        logger.info("Originals: {}", result.txn.originals());
-        Assert.assertEquals(1, result.txn.originals().size());
-        Assert.assertTrue(result.txn.originals().contains(unrepaired));
-        result.abort();  // releases sstable refs
-    }
-
-    @Test
-    public void pendingRepairNoSSTablesExist() throws Exception
-    {
-        cfs.disableAutoCompaction();
-
-        Assert.assertEquals(0, cfs.getLiveSSTables().size());
-
-        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID());
-        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
-        Assert.assertNotNull(result);
-
-        result.abort();  // There's nothing to release, but we should exit 
cleanly
-    }
-
-    /**
-     * anti compaction task should be submitted if everything is ok
-     */
-    @Test
-    public void callbackSuccess() throws Exception
-    {
-        cfs.disableAutoCompaction();
-        makeSSTables(2);
-
-        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID());
-        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
-        Assert.assertNotNull(result);
-
-        InstrumentedAcquisitionCallback cb = new 
InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE);
-        Assert.assertTrue(cb.submittedCompactions.isEmpty());
-        cb.apply(Lists.newArrayList(result));
-
-        Assert.assertEquals(1, cb.submittedCompactions.size());
-        Assert.assertTrue(cb.submittedCompactions.contains(cfm.id));
-    }
-
-    /**
-     * If one of the supplied AcquireResults is null, either an Exception was 
thrown, or
-     * we couldn't get a transaction for the sstables. In either case we need 
to cancel the repair, and release
-     * any sstables acquired for other tables
-     */
-    @Test
-    public void callbackNullResult() throws Exception
-    {
-        cfs.disableAutoCompaction();
-        makeSSTables(2);
-
-        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID());
-        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
-        Assert.assertNotNull(result);
-        
Assert.assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, 
result.txn.state());
-
-        InstrumentedAcquisitionCallback cb = new 
InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE);
-        Assert.assertTrue(cb.submittedCompactions.isEmpty());
-        cb.apply(Lists.newArrayList(result, null));
-
-        Assert.assertTrue(cb.submittedCompactions.isEmpty());
-        Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, 
result.txn.state());
-    }
-
-    /**
-     * If an AcquireResult has a null txn, there were no sstables to acquire 
references
-     * for, so no anti compaction should have been submitted.
-     */
-    @Test
-    public void callbackNullTxn() throws Exception
-    {
-        cfs.disableAutoCompaction();
-        makeSSTables(2);
-
-        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID());
-        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
-        Assert.assertNotNull(result);
-
-        ColumnFamilyStore cfs2 = 
Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata("system",
 "peers").id);
-        PendingAntiCompaction.AcquireResult fakeResult = new 
PendingAntiCompaction.AcquireResult(cfs2, null, null);
-
-        InstrumentedAcquisitionCallback cb = new 
InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE);
-        Assert.assertTrue(cb.submittedCompactions.isEmpty());
-        cb.apply(Lists.newArrayList(result, fakeResult));
-
-        Assert.assertEquals(1, cb.submittedCompactions.size());
-        Assert.assertTrue(cb.submittedCompactions.contains(cfm.id));
-        Assert.assertFalse(cb.submittedCompactions.contains(cfs2.metadata.id));
-    }
-
-
-    @Test
-    public void singleAnticompaction() throws Exception
-    {
-        cfs.disableAutoCompaction();
-        makeSSTables(2);
-
-        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID());
-        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
-        UUID sessionID = UUIDGen.getTimeUUID();
-        ActiveRepairService.instance.registerParentRepairSession(sessionID,
-                                                                 
InetAddressAndPort.getByName("127.0.0.1"),
-                                                                 
Lists.newArrayList(cfs),
-                                                                 FULL_RANGE,
-                                                                 true,0,
-                                                                 true,
-                                                                 
PreviewKind.NONE);
-        CompactionManager.instance.performAnticompaction(result.cfs, 
FULL_RANGE, result.refs, result.txn,
-                                                         
ActiveRepairService.UNREPAIRED_SSTABLE, sessionID, sessionID);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java 
b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index c4b0a9c..294731a 100644
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@ -264,21 +264,19 @@ public class ActiveRepairServiceTest
         ColumnFamilyStore store = prepareColumnFamilyStore();
         UUID prsId = UUID.randomUUID();
         Set<SSTableReader> original = 
Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> 
!s.isRepaired())).sstables);
+        Collection<Range<Token>> ranges = Collections.singleton(new 
Range<>(store.getPartitioner().getMinimumToken(), 
store.getPartitioner().getMinimumToken()));
         ActiveRepairService.instance.registerParentRepairSession(prsId, 
FBUtilities.getBroadcastAddressAndPort(), Collections.singletonList(store),
-                                                                 
Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(),
-                                                                               
                    store.getPartitioner().getMinimumToken())),
-                                                                 true, 
System.currentTimeMillis(), true, PreviewKind.NONE);
-        
ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.id,
 prsId);
+                                                                 ranges, true, 
System.currentTimeMillis(), true, PreviewKind.NONE);
+        store.getRepairManager().snapshot(prsId.toString(), ranges, false);
 
         UUID prsId2 = UUID.randomUUID();
         ActiveRepairService.instance.registerParentRepairSession(prsId2, 
FBUtilities.getBroadcastAddressAndPort(),
                                                                  
Collections.singletonList(store),
-                                                                 
Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(),
-                                                                               
                    store.getPartitioner().getMinimumToken())),
+                                                                 ranges,
                                                                  true, 
System.currentTimeMillis(),
                                                                  true, 
PreviewKind.NONE);
         createSSTables(store, 2);
-        
ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.id,
 prsId);
+        store.getRepairManager().snapshot(prsId.toString(), ranges, false);
         try (Refs<SSTableReader> refs = 
store.getSnapshotSSTableReaders(prsId.toString()))
         {
             assertEquals(original, Sets.newHashSet(refs.iterator()));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to