Merge branch 'cassandra-2.1' into cassandra-2.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/05f8a008
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/05f8a008
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/05f8a008

Branch: refs/heads/cassandra-2.2
Commit: 05f8a008f696d9624ec85176fa0e2a1ce06a1ad5
Parents: 593bbf5 72acbcd
Author: Marcus Eriksson <marc...@apache.org>
Authored: Mon Jun 13 14:34:01 2016 +0200
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Mon Jun 13 15:00:08 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  4 ++
 .../org/apache/cassandra/db/lifecycle/View.java |  5 ++
 .../cassandra/streaming/StreamSession.java      | 22 +++----
 .../io/sstable/SSTableRewriterTest.java         | 66 ++++++++++++++++++++
 5 files changed, 86 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/05f8a008/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d639d43,ebcc90c..491f72a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,31 -1,7 +1,32 @@@
 -2.1.15
 +2.2.7
 + * StorageService shutdown hook should use a volatile variable 
(CASSANDRA-11984)
 + * Persist local metadata earlier in startup sequence (CASSANDRA-11742)
 + * Run CommitLog tests with different compression settings (CASSANDRA-9039)
 + * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
 + * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587)
 + * Fix possible race condition in CommitLog.recover (CASSANDRA-11743)
 + * Enable client encryption in sstableloader with cli options 
(CASSANDRA-11708)
 + * Possible memory leak in NIODataInputStream (CASSANDRA-11867)
 + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
 + * Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
 + * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
 + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
 + * Exit JVM if JMX server fails to startup (CASSANDRA-11540)
 + * Produce a heap dump when exiting on OOM (CASSANDRA-9861)
 + * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427)
 + * Restore ability to filter on clustering columns when using a 2i 
(CASSANDRA-11510)
 + * JSON datetime formatting needs timezone (CASSANDRA-11137)
 + * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502)
 + * Remove unnescessary file existence check during anticompaction 
(CASSANDRA-11660)
 + * Add missing files to debian packages (CASSANDRA-11642)
 + * Avoid calling Iterables::concat in loops during 
ModificationStatement::getFunctions (CASSANDRA-11621)
 + * cqlsh: COPY FROM should use regular inserts for single statement batches 
and
 +   report errors correctly if workers processes crash on initialization 
(CASSANDRA-11474)
 + * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553)
 + * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988)
 +Merged from 2.1:
+  * Create interval tree over canonical sstables to avoid missing sstables 
during streaming (CASSANDRA-11886)
   * cqlsh COPY FROM: shutdown parent cluster after forking, to avoid 
corrupting SSL connections (CASSANDRA-11749)
 - * Updated cqlsh Python driver to fix DESCRIBE problem for legacy tables 
(CASSANDRA-11055)
   * cqlsh: apply current keyspace to source command (CASSANDRA-11152)
   * Backport CASSANDRA-11578 (CASSANDRA-11750)
   * Clear out parent repair session if repair coordinator dies 
(CASSANDRA-11824)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05f8a008/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index e24917c,559ba0b..d3a5028
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -1588,8 -1529,12 +1588,12 @@@ public class DatabaseDescripto
  
      public static int getSSTablePreempiveOpenIntervalInMB()
      {
 -        return conf.sstable_preemptive_open_interval_in_mb;
 +        return FBUtilities.isWindows() ? -1 : 
conf.sstable_preemptive_open_interval_in_mb;
      }
+     public static void setSSTablePreempiveOpenIntervalInMB(int mb)
+     {
+         conf.sstable_preemptive_open_interval_in_mb = mb;
+     }
  
      public static boolean getTrickleFsync()
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05f8a008/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/View.java
index fba1627,0000000..e303801
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@@ -1,281 -1,0 +1,286 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.lifecycle;
 +
 +import java.util.*;
 +
 +import javax.annotation.Nullable;
 +
 +import com.google.common.base.Function;
 +import com.google.common.base.Functions;
 +import com.google.common.base.Predicate;
 +import com.google.common.collect.*;
 +
 +import org.apache.cassandra.db.Memtable;
 +import org.apache.cassandra.db.RowPosition;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.utils.Interval;
 +
 +import static com.google.common.base.Predicates.equalTo;
 +import static com.google.common.base.Predicates.not;
 +import static com.google.common.collect.ImmutableList.copyOf;
 +import static com.google.common.collect.ImmutableList.of;
 +import static com.google.common.collect.Iterables.all;
 +import static com.google.common.collect.Iterables.concat;
 +import static com.google.common.collect.Iterables.filter;
 +import static java.util.Collections.singleton;
 +import static org.apache.cassandra.db.lifecycle.Helpers.emptySet;
 +import static org.apache.cassandra.db.lifecycle.Helpers.replace;
 +
 +/**
 + * An immutable structure holding the current memtable, the memtables pending
 + * flush, the sstables for a column family, and the sstables that are active
 + * in compaction (a subset of the sstables).
 + *
 + * Modifications to instances are all performed via a Function produced by 
the static methods in this class.
 + * These are composed as necessary and provided to the Tracker.apply() 
methods, which atomically reject or
 + * accept and apply the changes to the View.
 + *
 + */
 +public class View
 +{
 +    /**
 +     * ordinarily a list of size 1, but when preparing to flush will contain 
both the memtable we will flush
 +     * and the new replacement memtable, until all outstanding write 
operations on the old table complete.
 +     * The last item in the list is always the "current" memtable.
 +     */
 +    public final List<Memtable> liveMemtables;
 +    /**
 +     * contains all memtables that are no longer referenced for writing and 
are queued for / in the process of being
 +     * flushed. In chronologically ascending order.
 +     */
 +    public final List<Memtable> flushingMemtables;
 +    public final Set<SSTableReader> compacting;
 +    public final Set<SSTableReader> sstables;
 +    public final Set<SSTableReader> premature;
 +    // we use a Map here so that we can easily perform identity checks as 
well as equality checks.
 +    // When marking compacting, we now  indicate if we expect the sstables to 
be present (by default we do),
 +    // and we then check that not only are they all present in the live set, 
but that the exact instance present is
 +    // the one we made our decision to compact against.
 +    public final Map<SSTableReader, SSTableReader> sstablesMap;
 +
 +    public final SSTableIntervalTree intervalTree;
 +
 +    View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, 
Map<SSTableReader, SSTableReader> sstables, Set<SSTableReader> compacting, 
Set<SSTableReader> premature, SSTableIntervalTree intervalTree)
 +    {
 +        assert liveMemtables != null;
 +        assert flushingMemtables != null;
 +        assert sstables != null;
 +        assert compacting != null;
 +        assert intervalTree != null;
 +
 +        this.liveMemtables = liveMemtables;
 +        this.flushingMemtables = flushingMemtables;
 +
 +        this.sstablesMap = sstables;
 +        this.sstables = sstablesMap.keySet();
 +        this.compacting = compacting;
 +        this.premature = premature;
 +        this.intervalTree = intervalTree;
 +    }
 +
 +    public Memtable getCurrentMemtable()
 +    {
 +        return liveMemtables.get(liveMemtables.size() - 1);
 +    }
 +
 +    /**
 +     * @return the active memtable and all the memtables that are pending 
flush.
 +     */
 +    public Iterable<Memtable> getAllMemtables()
 +    {
 +        return concat(flushingMemtables, liveMemtables);
 +    }
 +
 +    public Sets.SetView<SSTableReader> nonCompactingSStables()
 +    {
 +        return Sets.difference(sstables, compacting);
 +    }
 +
 +    public Iterable<SSTableReader> getUncompacting(Iterable<SSTableReader> 
candidates)
 +    {
 +        return filter(candidates, new Predicate<SSTableReader>()
 +        {
 +            public boolean apply(SSTableReader sstable)
 +            {
 +                return !compacting.contains(sstable);
 +            }
 +        });
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return String.format("View(pending_count=%d, sstables=%s, 
compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, 
compacting);
 +    }
 +
 +    /**
 +      * Returns the sstables that have any partition between {@code left} and 
{@code right}, when both bounds are taken inclusively.
 +      * The interval formed by {@code left} and {@code right} shouldn't wrap.
 +      */
 +    public List<SSTableReader> sstablesInBounds(RowPosition left, RowPosition 
right)
 +    {
++        return sstablesInBounds(left, right, intervalTree);
++    }
++
++    public static List<SSTableReader> sstablesInBounds(RowPosition left, 
RowPosition right, SSTableIntervalTree intervalTree)
++    {
 +        assert !AbstractBounds.strictlyWrapsAround(left, right);
 +
 +        if (intervalTree.isEmpty())
 +            return Collections.emptyList();
 +
 +        RowPosition stopInTree = right.isMinimum() ? intervalTree.max() : 
right;
 +        return intervalTree.search(Interval.<RowPosition, 
SSTableReader>create(left, stopInTree));
 +    }
 +
 +    // METHODS TO CONSTRUCT FUNCTIONS FOR MODIFYING A VIEW:
 +
 +    // return a function to un/mark the provided readers compacting in a view
 +    static Function<View, View> updateCompacting(final Set<SSTableReader> 
unmark, final Iterable<SSTableReader> mark)
 +    {
 +        if (unmark.isEmpty() && Iterables.isEmpty(mark))
 +            return Functions.identity();
 +        return new Function<View, View>()
 +        {
 +            public View apply(View view)
 +            {
 +                assert all(mark, Helpers.idIn(view.sstablesMap));
 +                return new View(view.liveMemtables, view.flushingMemtables, 
view.sstablesMap,
 +                                replace(view.compacting, unmark, mark),
 +                                view.premature, view.intervalTree);
 +            }
 +        };
 +    }
 +
 +    // construct a predicate to reject views that do not permit us to mark 
these readers compacting;
 +    // i.e. one of them is either already compacting, has been compacted, or 
has been replaced
 +    static Predicate<View> permitCompacting(final Iterable<SSTableReader> 
readers)
 +    {
 +        return new Predicate<View>()
 +        {
 +            public boolean apply(View view)
 +            {
 +                for (SSTableReader reader : readers)
 +                    if (view.compacting.contains(reader) || 
view.sstablesMap.get(reader) != reader || reader.isMarkedCompacted() || 
view.premature.contains(reader))
 +                        return false;
 +                return true;
 +            }
 +        };
 +    }
 +
 +    // construct a function to change the liveset in a Snapshot
 +    static Function<View, View> updateLiveSet(final Set<SSTableReader> 
remove, final Iterable<SSTableReader> add)
 +    {
 +        if (remove.isEmpty() && Iterables.isEmpty(add))
 +            return Functions.identity();
 +        return new Function<View, View>()
 +        {
 +            public View apply(View view)
 +            {
 +                Map<SSTableReader, SSTableReader> sstableMap = 
replace(view.sstablesMap, remove, add);
 +                return new View(view.liveMemtables, view.flushingMemtables, 
sstableMap, view.compacting, view.premature,
 +                                
SSTableIntervalTree.build(sstableMap.keySet()));
 +            }
 +        };
 +    }
 +
 +    // called prior to initiating flush: add newMemtable to liveMemtables, 
making it the latest memtable
 +    static Function<View, View> switchMemtable(final Memtable newMemtable)
 +    {
 +        return new Function<View, View>()
 +        {
 +            public View apply(View view)
 +            {
 +                List<Memtable> newLive = 
ImmutableList.<Memtable>builder().addAll(view.liveMemtables).add(newMemtable).build();
 +                assert newLive.size() == view.liveMemtables.size() + 1;
 +                return new View(newLive, view.flushingMemtables, 
view.sstablesMap, view.compacting, view.premature, view.intervalTree);
 +            }
 +        };
 +    }
 +
 +    // called before flush: move toFlush from liveMemtables to 
flushingMemtables
 +    static Function<View, View> markFlushing(final Memtable toFlush)
 +    {
 +        return new Function<View, View>()
 +        {
 +            public View apply(View view)
 +            {
 +                List<Memtable> live = view.liveMemtables, flushing = 
view.flushingMemtables;
 +                List<Memtable> newLive = copyOf(filter(live, 
not(equalTo(toFlush))));
 +                List<Memtable> newFlushing = copyOf(concat(filter(flushing, 
lessThan(toFlush)),
 +                                                           of(toFlush),
 +                                                           filter(flushing, 
not(lessThan(toFlush)))));
 +                assert newLive.size() == live.size() - 1;
 +                assert newFlushing.size() == flushing.size() + 1;
 +                return new View(newLive, newFlushing, view.sstablesMap, 
view.compacting, view.premature, view.intervalTree);
 +            }
 +        };
 +    }
 +
 +    // called after flush: removes memtable from flushingMemtables, and 
inserts flushed into the live sstable set
 +    static Function<View, View> replaceFlushed(final Memtable memtable, final 
SSTableReader flushed)
 +    {
 +        return new Function<View, View>()
 +        {
 +            public View apply(View view)
 +            {
 +                List<Memtable> flushingMemtables = 
copyOf(filter(view.flushingMemtables, not(equalTo(memtable))));
 +                assert flushingMemtables.size() == 
view.flushingMemtables.size() - 1;
 +
 +                if (flushed == null)
 +                    return new View(view.liveMemtables, flushingMemtables, 
view.sstablesMap,
 +                                    view.compacting, view.premature, 
view.intervalTree);
 +
 +                Map<SSTableReader, SSTableReader> sstableMap = 
replace(view.sstablesMap, emptySet(), singleton(flushed));
 +                Set<SSTableReader> compacting = replace(view.compacting, 
emptySet(), singleton(flushed));
 +                Set<SSTableReader> premature = replace(view.premature, 
emptySet(), singleton(flushed));
 +                return new View(view.liveMemtables, flushingMemtables, 
sstableMap, compacting, premature,
 +                                
SSTableIntervalTree.build(sstableMap.keySet()));
 +            }
 +        };
 +    }
 +
 +    static Function<View, View> permitCompactionOfFlushed(final SSTableReader 
reader)
 +    {
 +        return new Function<View, View>()
 +        {
 +
 +            @Nullable
 +            public View apply(View view)
 +            {
 +                Set<SSTableReader> premature = 
ImmutableSet.copyOf(filter(view.premature, not(equalTo(reader))));
 +                Set<SSTableReader> compacting = 
ImmutableSet.copyOf(filter(view.compacting, not(equalTo(reader))));
 +                return new View(view.liveMemtables, view.flushingMemtables, 
view.sstablesMap, compacting, premature, view.intervalTree);
 +            }
 +        };
 +    }
 +
 +
 +    private static <T extends Comparable<T>> Predicate<T> lessThan(final T 
lessThan)
 +    {
 +        return new Predicate<T>()
 +        {
 +            public boolean apply(T t)
 +            {
 +                return t.compareTo(lessThan) < 0;
 +            }
 +        };
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05f8a008/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index f14b94c,273631c..f4c900e
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -25,18 -25,22 +25,22 @@@ import java.util.*
  import java.util.concurrent.*;
  import java.util.concurrent.atomic.AtomicBoolean;
  
 -import javax.annotation.Nullable;
 -
+ import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Function;
  import com.google.common.collect.*;
 +
++import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
 +import org.apache.cassandra.db.lifecycle.View;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.ColumnFamilyStore;
 -import org.apache.cassandra.db.DataTracker;
  import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.db.RowPosition;
+ import org.apache.cassandra.dht.AbstractBounds;
+ import org.apache.cassandra.dht.Bounds;
 -import org.apache.cassandra.dht.IPartitioner;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.gms.*;
@@@ -279,7 -270,7 +283,7 @@@ public class StreamSession implements I
              flushSSTables(stores);
  
          List<Range<Token>> normalizedRanges = Range.normalize(ranges);
-         List<SSTableStreamingSections> sections = 
getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt);
 -        List<SSTableStreamingSections> sections = 
getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt, repairedAt != 
ActiveRepairService.UNREPAIRED_SSTABLE);
++        List<SSTableStreamingSections> sections = 
getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt, 
isIncremental);
          try
          {
              addTransferFiles(sections);
@@@ -314,33 -306,21 +319,26 @@@
          {
              for (ColumnFamilyStore cfStore : stores)
              {
 -                final List<AbstractBounds<RowPosition>> rowBoundsList = new 
ArrayList<>(ranges.size());
 -                final IPartitioner partitioner = cfStore.partitioner;
 +                final List<Range<RowPosition>> keyRanges = new 
ArrayList<>(ranges.size());
                  for (Range<Token> range : ranges)
 -                    rowBoundsList.add(range.toRowBounds());
 -                refs.addAll(cfStore.selectAndReference(new 
Function<DataTracker.View, List<SSTableReader>>()
 +                    keyRanges.add(Range.makeRowRange(range));
 +                refs.addAll(cfStore.selectAndReference(new Function<View, 
List<SSTableReader>>()
                  {
 -                    public List<SSTableReader> apply(DataTracker.View view)
 +                    public List<SSTableReader> apply(View view)
                      {
-                         Map<SSTableReader, SSTableReader> permittedInstances 
= new HashMap<>();
-                         for (SSTableReader reader : 
ColumnFamilyStore.CANONICAL_SSTABLES.apply(view))
-                             permittedInstances.put(reader, reader);
- 
 -                        DataTracker.SSTableIntervalTree intervalTree = 
DataTracker.buildIntervalTree(ColumnFamilyStore.CANONICAL_SSTABLES.apply(view));
++                        SSTableIntervalTree intervalTree = 
SSTableIntervalTree.build(ColumnFamilyStore.CANONICAL_SSTABLES.apply(view));
                          Set<SSTableReader> sstables = Sets.newHashSet();
 -                        for (AbstractBounds<RowPosition> rowBounds : 
rowBoundsList)
 +                        for (Range<RowPosition> keyRange : keyRanges)
                          {
 -                            for (SSTableReader sstable : 
DataTracker.View.sstablesInBounds(rowBounds, intervalTree, partitioner))
 +                            // keyRange excludes its start, while 
sstableInBounds is inclusive (of both start and end).
 +                            // This is fine however, because keyRange has 
been created from a token range through Range.makeRowRange (see above).
 +                            // And that later method uses the 
Token.maxKeyBound() method to creates the range, which return a "fake" key that
 +                            // sort after all keys having the token. That 
"fake" key cannot however be equal to any real key, so that even
 +                            // including keyRange.left will still exclude any 
key having the token of the original token range, and so we're
 +                            // still actually selecting what we wanted.
-                             for (SSTableReader sstable : 
view.sstablesInBounds(keyRange.left, keyRange.right))
++                            for (SSTableReader sstable : 
View.sstablesInBounds(keyRange.left, keyRange.right, intervalTree))
                              {
-                                 // sstableInBounds may contain early opened 
sstables
-                                 if (isIncremental && sstable.isRepaired())
-                                     continue;
-                                 sstable = permittedInstances.get(sstable);
-                                 if (sstable != null)
+                                 if (!isIncremental || !sstable.isRepaired())
                                      sstables.add(sstable);
                              }
                          }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05f8a008/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index c2cc6e3,1fb28f5..f50953a
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@@ -21,15 -21,11 +21,18 @@@ import java.io.File
  import java.io.IOException;
  import java.nio.ByteBuffer;
  import java.util.*;
+ import java.util.concurrent.ExecutionException;
  import java.util.concurrent.ThreadLocalRandom;
 +import java.util.concurrent.TimeUnit;
++import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicInteger;
  
 +import com.google.common.collect.Iterables;
  import com.google.common.collect.Sets;
 +import org.junit.After;
 +import org.junit.AfterClass;
 +import org.junit.BeforeClass;
 +import com.google.common.util.concurrent.Uninterruptibles;
  import org.junit.Test;
  
  import org.apache.cassandra.SchemaLoader;
@@@ -41,25 -35,24 +44,29 @@@ import org.apache.cassandra.db.*
  import org.apache.cassandra.db.compaction.AbstractCompactedRow;
  import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
  import org.apache.cassandra.db.compaction.CompactionController;
 +import org.apache.cassandra.db.compaction.CompactionManager;
  import org.apache.cassandra.db.compaction.LazilyCompactedRow;
  import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
 +import org.apache.cassandra.locator.SimpleStrategy;
  import org.apache.cassandra.db.compaction.SSTableSplitter;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
 -import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
  import org.apache.cassandra.metrics.StorageMetrics;
+ import org.apache.cassandra.notifications.INotification;
+ import org.apache.cassandra.notifications.INotificationConsumer;
+ import org.apache.cassandra.notifications.SSTableListChangedNotification;
  import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.streaming.StreamSession;
  import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.Pair;
  
 -import static org.junit.Assert.assertEquals;
 -import static org.junit.Assert.assertFalse;
 -import static org.junit.Assert.assertTrue;
 +import static org.junit.Assert.*;
  
  public class SSTableRewriterTest extends SchemaLoader
  {
@@@ -807,10 -769,76 +814,69 @@@
                  }
              }
          }
 -        writer.abort();
 -        cfs.getDataTracker().unmarkCompacting(sstables);
 -        truncate(cfs);
 +        truncateCF();
 +        validateCFS(cfs);
      }
  
+     @Test
+     public void testSSTableSectionsForRanges() throws IOException, 
InterruptedException, ExecutionException
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+         truncate(cfs);
+ 
+         cfs.addSSTable(writeFile(cfs, 1000));
+ 
+         Collection<SSTableReader> allSSTables = cfs.getSSTables();
+         assertEquals(1, allSSTables.size());
+         final Token firstToken = 
allSSTables.iterator().next().first.getToken();
+         DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(1);
+ 
+         List<StreamSession.SSTableStreamingSections> sectionsBeforeRewrite = 
StreamSession.getSSTableSectionsForRanges(
+             Collections.singleton(new Range<Token>(firstToken, firstToken)),
+             Collections.singleton(cfs), 0L, false);
+         assertEquals(1, sectionsBeforeRewrite.size());
+         for (StreamSession.SSTableStreamingSections section : 
sectionsBeforeRewrite)
+             section.ref.release();
+         final AtomicInteger checkCount = new AtomicInteger();
+         // needed since we get notified when compaction is done as well - we 
can't get sections for ranges for obsoleted sstables
 -        INotificationConsumer consumer = new INotificationConsumer()
++        final AtomicBoolean done = new AtomicBoolean(false);
++        final AtomicBoolean failed = new AtomicBoolean(false);
++        Runnable r = new Runnable()
++        {
++            public void run()
++            {
++                while (!done.get())
+                 {
 -                    public void handleNotification(INotification 
notification, Object sender)
 -                    {
 -                        if (notification instanceof 
SSTableListChangedNotification)
 -                        {
 -                            Collection<SSTableReader> added = 
((SSTableListChangedNotification) notification).added;
 -                            Collection<SSTableReader> removed = 
((SSTableListChangedNotification) notification).removed;
 -                            // note that we need to check if 
added.equals(removed) because once the compaction is done the old sstable will 
have
 -                            // selfRef().globalCount() == 0 and we cant get 
the SectionsForRanges then. During incremental opening we always add and remove 
the same
 -                            // sstable (note that the sstables are x.equal(y) 
but not x == y since the new one will be a new instance with a moved starting 
point
 -                            // In this case we must avoid trying to call 
getSSTableSectionsForRanges since we are in the notification
 -                            // method and trying to reference an sstable with 
globalcount == 0 puts it into a loop, and this blocks the tracker from removing 
the
 -                            // unreferenced sstable.
 -                            if (added.isEmpty() || 
!added.iterator().next().getColumnFamilyName().equals(cfs.getColumnFamilyName())
 || !added.equals(removed))
 -                                return;
 -
 -                            // at no point must the rewrite process hide
 -                            // sections returned by 
getSSTableSectionsForRanges
 -                            Set<Range<Token>> range = 
Collections.singleton(new Range<Token>(firstToken, firstToken));
 -                            List<StreamSession.SSTableStreamingSections> 
sections = StreamSession.getSSTableSectionsForRanges(range, 
Collections.singleton(cfs), 0L, false);
 -                            assertEquals(1, sections.size());
 -                            for (StreamSession.SSTableStreamingSections 
section : sections)
 -                                section.ref.release();
 -                            checkCount.incrementAndGet();
 -                        }
 -                    }
 -                };
 -        cfs.getDataTracker().subscribe(consumer);
++                    Set<Range<Token>> range = Collections.singleton(new 
Range<Token>(firstToken, firstToken));
++                    List<StreamSession.SSTableStreamingSections> sections = 
StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), 
0L, false);
++                    if (sections.size() != 1)
++                        failed.set(true);
++                    for (StreamSession.SSTableStreamingSections section : 
sections)
++                        section.ref.release();
++                    checkCount.incrementAndGet();
++                    Uninterruptibles.sleepUninterruptibly(5, 
TimeUnit.MILLISECONDS);
++                }
++            }
++        };
++        Thread t = new Thread(r);
+         try
+         {
++            t.start();
+             cfs.forceMajorCompaction();
+             // reset
+         }
+         finally
+         {
+             DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(50);
 -            cfs.getDataTracker().unsubscribe(consumer);
++            done.set(true);
++            t.join(20);
+         }
++        assertFalse(failed.get());
+         assertTrue(checkCount.get() >= 2);
+         truncate(cfs);
+     }
+ 
      /**
       * emulates anticompaction - writing from one source sstable to two new 
sstables
       *

Reply via email to