Merge branch 'cassandra-2.2' into cassandra-3.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/413e48e6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/413e48e6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/413e48e6

Branch: refs/heads/trunk
Commit: 413e48e6571e3c23362d5053e0c7fcdd99bd1e7d
Parents: 5cebd1f c70ce63
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Sep 29 14:32:49 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Sep 29 14:32:49 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +-
 .../db/compaction/CompactionManager.java        |   6 +-
 .../org/apache/cassandra/repair/Validator.java  |   9 +-
 .../org/apache/cassandra/utils/MerkleTree.java  |  10 ++
 .../org/apache/cassandra/utils/MerkleTrees.java |  10 ++
 .../db/compaction/CompactionsTest.java          |   2 +-
 .../apache/cassandra/repair/ValidatorTest.java  | 167 ++++++++++++-------
 7 files changed, 146 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f2f8dac,97bc70a..9076e7a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,77 -1,13 +1,78 @@@
 -2.2.9
 +3.0.10
 + * Fix failure in LogTransactionTest (CASSANDRA-12632)
 + * Fix potentially incomplete non-frozen UDT values when querying with the
 +   full primary key specified (CASSANDRA-12605)
 + * Skip writing MV mutations to commitlog on mutation.applyUnsafe() 
(CASSANDRA-11670)
 + * Establish consistent distinction between non-existing partition and NULL 
value for LWTs on static columns (CASSANDRA-12060)
 + * Extend ColumnIdentifier.internedInstances key to include the type that 
generated the byte buffer (CASSANDRA-12516)
 + * Backport CASSANDRA-10756 (race condition in NativeTransportService 
shutdown) (CASSANDRA-12472)
 + * If CF has no clustering columns, any row cache is full partition cache 
(CASSANDRA-12499)
 + * Reject invalid replication settings when creating or altering a keyspace 
(CASSANDRA-12681)
 +Merged from 2.2:
+  * Fix merkle tree depth calculation (CASSANDRA-12580)
+  * Make Collections deserialization more robust (CASSANDRA-12618)
 - 
 - 
 -2.2.8
   * Fix exceptions when enabling gossip on nodes that haven't joined the ring 
(CASSANDRA-12253)
   * Fix authentication problem when invoking clqsh copy from a SOURCE command 
(CASSANDRA-12642)
   * Decrement pending range calculator jobs counter in finally block
    (CASSANDRA-12554)
 +Merged from 2.1:
-  * Make Collections deserialization more robust (CASSANDRA-12618)
 + * Add system property to set the max number of native transport requests in 
queue (CASSANDRA-11363)
 +
 +
 +3.0.9
 + * Handle composite prefixes with final EOC=0 as in 2.x and refactor 
LegacyLayout.decodeBound (CASSANDRA-12423)
 + * Fix paging for 2.x to 3.x upgrades (CASSANDRA-11195)
 + * select_distinct_with_deletions_test failing on non-vnode environments 
(CASSANDRA-11126)
 + * Stack Overflow returned to queries while upgrading (CASSANDRA-12527)
 + * Fix legacy regex for temporary files from 2.2 (CASSANDRA-12565)
 + * Add option to state current gc_grace_seconds to tools/bin/sstablemetadata 
(CASSANDRA-12208)
 + * Fix file system race condition that may cause LogAwareFileLister to fail 
to classify files (CASSANDRA-11889)
 + * Fix file handle leaks due to simultaneous compaction/repair and
 +   listing snapshots, calculating snapshot sizes, or making schema
 +   changes (CASSANDRA-11594)
 + * Fix nodetool repair exits with 0 for some errors (CASSANDRA-12508)
 + * Do not shut down BatchlogManager twice during drain (CASSANDRA-12504)
 + * Disk failure policy should not be invoked on out of space (CASSANDRA-12385)
 + * Calculate last compacted key on startup (CASSANDRA-6216)
 + * Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE 
statements (CASSANDRA-7190)
 + * Fix clean interval not sent to commit log for empty memtable flush 
(CASSANDRA-12436)
 + * Fix potential resource leak in RMIServerSocketFactoryImpl (CASSANDRA-12331)
 + * Backport CASSANDRA-12002 (CASSANDRA-12177)
 + * Make sure compaction stats are updated when compaction is interrupted 
(CASSANDRA-12100)
 + * Fix potential bad messaging service message for paged range reads
 +   within mixed-version 3.x clusters (CASSANDRA-12249)
 + * Change commitlog and sstables to track dirty and clean intervals 
(CASSANDRA-11828)
 + * NullPointerException during compaction on table with static columns 
(CASSANDRA-12336)
 + * Fixed ConcurrentModificationException when reading metrics in 
GraphiteReporter (CASSANDRA-11823)
 + * Fix upgrade of super columns on thrift (CASSANDRA-12335)
 + * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and 
increased corruption size (CASSANDRA-12359)
 + * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness 
(CASSANDRA-12277)
 + * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
 + * Lost counter writes in compact table and static columns (CASSANDRA-12219)
 + * AssertionError with MVs on updating a row that isn't indexed due to a null 
value (CASSANDRA-12247)
 + * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
 + * Add option to override compaction space check (CASSANDRA-12180)
 + * Faster startup by only scanning each directory for temporary files once 
(CASSANDRA-12114)
 + * Respond with v1/v2 protocol header when responding to driver that attempts
 +   to connect with too low of a protocol version (CASSANDRA-11464)
 + * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
 + * Fix problem with undeleteable rows on upgrade to new sstable format 
(CASSANDRA-12144)
 + * Fix paging logic for deleted partitions with static columns 
(CASSANDRA-12107)
 + * Wait until the message is being send to decide which serializer must be 
used (CASSANDRA-11393)
 + * Fix migration of static thrift column names with non-text comparators 
(CASSANDRA-12147)
 + * Fix upgrading sparse tables that are incorrectly marked as dense 
(CASSANDRA-11315)
 + * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)
 + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
 + * Avoid missing sstables when getting the canonical sstables 
(CASSANDRA-11996)
 + * Always select the live sstables when getting sstables in bounds 
(CASSANDRA-11944)
 + * Fix column ordering of results with static columns for Thrift requests in
 +   a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
 +   those static columns in query results (CASSANDRA-12123)
 + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
 + * Fix EOF exception when altering column type (CASSANDRA-11820)
 + * Fix JsonTransformer output of partition with deletion info 
(CASSANDRA-12418)
 + * Fix NPE in SSTableLoader when specifying partial directory path 
(CASSANDRA-12609)
 +Merged from 2.2:
   * Add local address entry in PropertyFileSnitch (CASSANDRA-11332)
   * cqlshlib tests: increase default execute timeout (CASSANDRA-12481)
   * Forward writes to replacement node when replace_address != 
broadcast_address (CASSANDRA-8523)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 99e0fd5,78fa23c..4d1757e
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1094,34 -1102,40 +1094,33 @@@ public class CompactionManager implemen
                  if (validator.gcBefore > 0)
                      gcBefore = validator.gcBefore;
                  else
 -                    gcBefore = getDefaultGcBefore(cfs);
 +                    gcBefore = getDefaultGcBefore(cfs, nowInSec);
              }
  
 -            // Create Merkle tree suitable to hold estimated partitions for 
given range.
 -            // We blindly assume that partition is evenly distributed on all 
sstables for now.
 -            long numPartitions = 0;
 -            for (SSTableReader sstable : sstables)
 -            {
 -                numPartitions += 
sstable.estimatedKeysForRanges(singleton(validator.desc.range));
 -            }
 -            // determine tree depth from number of partitions, but cap at 20 
to prevent large tree (CASSANDRA-5263)
 -            int depth = numPartitions > 0 ? (int) 
Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), 20) : 0;
 -            MerkleTree tree = new MerkleTree(cfs.partitioner, 
validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
 -
 +            // Create Merkle trees suitable to hold estimated partitions for 
the given ranges.
 +            // We blindly assume that a partition is evenly distributed on 
all sstables for now.
-             // determine tree depth from number of partitions, but cap at 20 
to prevent large tree.
 +            MerkleTrees tree = createMerkleTrees(sstables, 
validator.desc.ranges, cfs);
              long start = System.nanoTime();
 -            try (AbstractCompactionStrategy.ScannerList scanners = 
cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
 +            try (AbstractCompactionStrategy.ScannerList scanners = 
cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges);
 +                 ValidationCompactionController controller = new 
ValidationCompactionController(cfs, gcBefore);
 +                 CompactionIterator ci = new 
ValidationCompactionIterator(scanners.scanners, controller, nowInSec, metrics))
              {
 -                CompactionIterable ci = new ValidationCompactionIterable(cfs, 
scanners.scanners, gcBefore);
 -                Iterator<AbstractCompactedRow> iter = ci.iterator();
 -                metrics.beginCompaction(ci);
 -                try
 +                // validate the CF as we iterate over it
 +                validator.prepare(cfs, tree);
 +                while (ci.hasNext())
                  {
 -                    // validate the CF as we iterate over it
 -                    validator.prepare(cfs, tree);
 -                    while (iter.hasNext())
 +                    if (ci.isStopRequested())
 +                        throw new 
CompactionInterruptedException(ci.getCompactionInfo());
 +                    try (UnfilteredRowIterator partition = ci.next())
                      {
 -                        if (ci.isStopRequested())
 -                            throw new 
CompactionInterruptedException(ci.getCompactionInfo());
 -                        AbstractCompactedRow row = iter.next();
 -                        validator.add(row);
 +                        validator.add(partition);
                      }
 -                    validator.complete();
                  }
 -                finally
 +                validator.complete();
 +            }
 +            finally
 +            {
 +                if (isSnapshotValidation && !isGlobalSnapshotValidation)
                  {
                      // we can only clear the snapshot if we are not doing a 
global snapshot validation (we then clear it once anticompaction
                      // is done).
@@@ -1144,37 -1167,6 +1143,40 @@@
          }
      }
  
 +    private static MerkleTrees createMerkleTrees(Iterable<SSTableReader> 
sstables, Collection<Range<Token>> ranges, ColumnFamilyStore cfs)
 +    {
 +        MerkleTrees tree = new MerkleTrees(cfs.getPartitioner());
 +        long allPartitions = 0;
 +        Map<Range<Token>, Long> rangePartitionCounts = new HashMap<>();
 +        for (Range<Token> range : ranges)
 +        {
 +            long numPartitions = 0;
 +            for (SSTableReader sstable : sstables)
 +                numPartitions += 
sstable.estimatedKeysForRanges(Collections.singleton(range));
 +            rangePartitionCounts.put(range, numPartitions);
 +            allPartitions += numPartitions;
 +        }
 +
 +        for (Range<Token> range : ranges)
 +        {
 +            long numPartitions = rangePartitionCounts.get(range);
 +            double rangeOwningRatio = allPartitions > 0 ? 
(double)numPartitions / allPartitions : 0;
++            // determine max tree depth proportional to range size to avoid 
blowing up memory with multiple tress,
++            // capping at 20 to prevent large tree (CASSANDRA-11390)
 +            int maxDepth = rangeOwningRatio > 0 ? (int) Math.floor(20 - 
Math.log(1 / rangeOwningRatio) / Math.log(2)) : 0;
-             int depth = numPartitions > 0 ? (int) 
Math.min(Math.floor(Math.log(numPartitions)), maxDepth) : 0;
++            // determine tree depth from number of partitions, capping at max 
tree depth (CASSANDRA-5263)
++            int depth = numPartitions > 0 ? (int) 
Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), maxDepth) : 0;
 +            tree.addMerkleTree((int) Math.pow(2, depth), range);
 +        }
 +        if (logger.isDebugEnabled())
 +        {
 +            // MT serialize may take time
 +            logger.debug("Created {} merkle trees with merkle trees size {}, 
{} partitions, {} bytes", tree.ranges().size(), tree.size(), allPartitions, 
MerkleTrees.serializer.serializedSize(tree, 0));
 +        }
 +
 +        return tree;
 +    }
 +
      private synchronized Refs<SSTableReader> 
getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
      {
          Refs<SSTableReader> sstables;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/Validator.java
index 217c9de,8dbb4cf..9baa358
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@@ -77,13 -79,14 +83,14 @@@ public class Validator implements Runna
          validated = 0;
          range = null;
          ranges = null;
+         this.evenTreeDistribution = evenTreeDistribution;
      }
  
 -    public void prepare(ColumnFamilyStore cfs, MerkleTree tree)
 +    public void prepare(ColumnFamilyStore cfs, MerkleTrees tree)
      {
 -        this.tree = tree;
 +        this.trees = tree;
  
-         if (!tree.partitioner().preservesOrder())
+         if (!tree.partitioner().preservesOrder() || evenTreeDistribution)
          {
              // You can't beat an even tree distribution for md5
              tree.init();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/utils/MerkleTrees.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/MerkleTrees.java
index b950b3b,0000000..4ae55ab
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/utils/MerkleTrees.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTrees.java
@@@ -1,436 -1,0 +1,446 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.utils;
 +
 +import java.io.ByteArrayOutputStream;
 +import java.io.IOException;
 +import java.util.*;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.AbstractIterator;
 +import com.google.common.collect.PeekingIterator;
 +import org.slf4j.Logger;
 +
 +import org.apache.cassandra.db.TypeSizes;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.IVersionedSerializer;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +
 +
 +/**
 + * Wrapper class for handling of multiple MerkleTrees at once.
 + * 
 + * The MerkleTree's are divided in Ranges of non-overlapping tokens.
 + */
 +public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, 
MerkleTree>>
 +{
 +    public static final MerkleTreesSerializer serializer = new 
MerkleTreesSerializer();
 +
 +    private Map<Range<Token>, MerkleTree> merkleTrees = new TreeMap<>(new 
TokenRangeComparator());
 +
 +    private IPartitioner partitioner;
 +
 +    /**
 +     * Creates empty MerkleTrees object.
 +     * 
 +     * @param partitioner The partitioner to use
 +     */
 +    public MerkleTrees(IPartitioner partitioner)
 +    {
 +        this(partitioner, new ArrayList<>());
 +    }
 +
 +    private MerkleTrees(IPartitioner partitioner, Collection<MerkleTree> 
merkleTrees)
 +    {
 +        this.partitioner = partitioner;
 +        addTrees(merkleTrees);
 +    }
 +
 +    /**
 +     * Get the ranges that these merkle trees covers.
 +     * 
 +     * @return
 +     */
 +    public Collection<Range<Token>> ranges()
 +    {
 +        return merkleTrees.keySet();
 +    }
 +
 +    /**
 +     * Get the partitioner in use.
 +     * 
 +     * @return
 +     */
 +    public IPartitioner partitioner()
 +    {
 +        return partitioner;
 +    }
 +
 +    /**
 +     * Add merkle tree's with the defined maxsize and ranges.
 +     * 
 +     * @param maxsize
 +     * @param ranges
 +     */
 +    public void addMerkleTrees(int maxsize, Collection<Range<Token>> ranges)
 +    {
 +        for (Range<Token> range : ranges)
 +        {
 +            addMerkleTree(maxsize, range);
 +        }
 +    }
 +
 +    /**
 +     * Add a MerkleTree with the defined size and range.
 +     * 
 +     * @param maxsize
 +     * @param range
 +     * @return The created merkle tree.
 +     */
 +    public MerkleTree addMerkleTree(int maxsize, Range<Token> range)
 +    {
 +        return addMerkleTree(maxsize, MerkleTree.RECOMMENDED_DEPTH, range);
 +    }
 +
 +    @VisibleForTesting
 +    public MerkleTree addMerkleTree(int maxsize, byte hashdepth, Range<Token> 
range)
 +    {
 +        MerkleTree tree = new MerkleTree(partitioner, range, hashdepth, 
maxsize);
 +        addTree(tree);
 +
 +        return tree;
 +    }
 +
 +    /**
 +     * Get the MerkleTree.Range responsible for the given token.
 +     * 
 +     * @param t
 +     * @return
 +     */
 +    @VisibleForTesting
 +    public MerkleTree.TreeRange get(Token t)
 +    {
 +        return getMerkleTree(t).get(t);
 +    }
 +
 +    /**
 +     * Init all MerkleTree's with an even tree distribution.
 +     */
 +    public void init()
 +    {
 +        for (Range<Token> range : merkleTrees.keySet())
 +        {
 +            init(range);
 +        }
 +    }
 +
 +    /**
 +     * Init a selected MerkleTree with an even tree distribution.
 +     * 
 +     * @param range
 +     */
 +    public void init(Range<Token> range)
 +    {
 +        merkleTrees.get(range).init();
 +    }
 +
 +    /**
 +     * Split the MerkleTree responsible for the given token.
 +     * 
 +     * @param t
 +     * @return
 +     */
 +    public boolean split(Token t)
 +    {
 +        return getMerkleTree(t).split(t);
 +    }
 +
 +    /**
 +     * Invalidate the MerkleTree responsible for the given token.
 +     * 
 +     * @param t
 +     */
 +    @VisibleForTesting
 +    public void invalidate(Token t)
 +    {
 +        getMerkleTree(t).invalidate(t);
 +    }
 +
 +    /**
 +     * Get the MerkleTree responsible for the given token range.
 +     * 
 +     * @param range
 +     * @return
 +     */
 +    public MerkleTree getMerkleTree(Range<Token> range)
 +    {
 +        return merkleTrees.get(range);
 +    }
 +
 +    public long size()
 +    {
 +        long size = 0;
 +
 +        for (MerkleTree tree : merkleTrees.values())
 +        {
 +            size += tree.size();
 +        }
 +
 +        return size;
 +    }
 +
 +    @VisibleForTesting
 +    public void maxsize(Range<Token> range, int maxsize)
 +    {
 +        getMerkleTree(range).maxsize(maxsize);
 +    }
 +
 +    /**
 +     * Get the MerkleTree responsible for the given token.
 +     * 
 +     * @param t
 +     * @return The given MerkleTree or null if none exist.
 +     */
 +    private MerkleTree getMerkleTree(Token t)
 +    {
 +        for (Range<Token> range : merkleTrees.keySet())
 +        {
 +            if (range.contains(t))
 +                return merkleTrees.get(range);
 +        }
 +
 +        throw new AssertionError("Expected tree for token " + t);
 +    }
 +
 +    private void addTrees(Collection<MerkleTree> trees)
 +    {
 +        for (MerkleTree tree : trees)
 +        {
 +            addTree(tree);
 +        }
 +    }
 +
 +    private void addTree(MerkleTree tree)
 +    {
 +        assert validateNonOverlapping(tree) : "Range [" + tree.fullRange + "] 
is intersecting an existing range";
 +
 +        merkleTrees.put(tree.fullRange, tree);
 +    }
 +
 +    private boolean validateNonOverlapping(MerkleTree tree)
 +    {
 +        for (Range<Token> range : merkleTrees.keySet())
 +        {
 +            if (tree.fullRange.intersects(range))
 +                return false;
 +        }
 +
 +        return true;
 +    }
 +
 +    /**
 +     * Get an iterator for all the invalids generated by the MerkleTrees.
 +     * 
 +     * @return
 +     */
 +    public TreeRangeIterator invalids()
 +    {
 +        return new TreeRangeIterator();
 +    }
 +
 +    /**
 +     * Log the row count per leaf for all MerkleTrees.
 +     * 
 +     * @param logger
 +     */
 +    public void logRowCountPerLeaf(Logger logger)
 +    {
 +        for (MerkleTree tree : merkleTrees.values())
 +        {
 +            tree.histogramOfRowCountPerLeaf().log(logger);
 +        }
 +    }
 +
 +    /**
 +     * Log the row size per leaf for all MerkleTrees.
 +     * 
 +     * @param logger
 +     */
 +    public void logRowSizePerLeaf(Logger logger)
 +    {
 +        for (MerkleTree tree : merkleTrees.values())
 +        {
 +            tree.histogramOfRowSizePerLeaf().log(logger);
 +        }
 +    }
 +
 +    @VisibleForTesting
 +    public byte[] hash(Range<Token> range)
 +    {
 +        ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +        boolean hashed = false;
 +
 +        try
 +        {
 +            for (Range<Token> rt : merkleTrees.keySet())
 +            {
 +                if (rt.intersects(range))
 +                {
 +                    byte[] bytes = merkleTrees.get(rt).hash(range);
 +                    if (bytes != null)
 +                    {
 +                        baos.write(bytes);
 +                        hashed = true;
 +                    }
 +                }
 +            }
 +        }
 +        catch (IOException e)
 +        {
 +            throw new RuntimeException("Unable to append merkle tree hash to 
result");
 +        }
 +        
 +        return hashed ? baos.toByteArray() : null;
 +    }
 +
 +    /**
 +     * Get an iterator of all ranges and their MerkleTrees.
 +     */
 +    public Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator()
 +    {
 +        return merkleTrees.entrySet().iterator();
 +    }
 +
++    public long rowCount()
++    {
++        long totalCount = 0;
++        for (MerkleTree tree : merkleTrees.values())
++        {
++            totalCount += tree.rowCount();
++        }
++        return totalCount;
++    }
++
 +    public class TreeRangeIterator extends 
AbstractIterator<MerkleTree.TreeRange> implements
 +            Iterable<MerkleTree.TreeRange>,
 +            PeekingIterator<MerkleTree.TreeRange>
 +    {
 +        private final Iterator<MerkleTree> it;
 +
 +        private MerkleTree.TreeRangeIterator current = null;
 +
 +        private TreeRangeIterator()
 +        {
 +            it = merkleTrees.values().iterator();
 +        }
 +
 +        public MerkleTree.TreeRange computeNext()
 +        {
 +            if (current == null || !current.hasNext())
 +                return nextIterator();
 +
 +            return current.next();
 +        }
 +
 +        private MerkleTree.TreeRange nextIterator()
 +        {
 +            if (it.hasNext())
 +            {
 +                current = it.next().invalids();
 +
 +                return current.next();
 +            }
 +
 +            return endOfData();
 +        }
 +
 +        public Iterator<MerkleTree.TreeRange> iterator()
 +        {
 +            return this;
 +        }
 +    }
 +
 +    /**
 +     * Get the differences between the two sets of MerkleTrees.
 +     * 
 +     * @param ltree
 +     * @param rtree
 +     * @return
 +     */
 +    public static List<Range<Token>> difference(MerkleTrees ltree, 
MerkleTrees rtree)
 +    {
 +        List<Range<Token>> differences = new ArrayList<>();
 +        for (MerkleTree tree : ltree.merkleTrees.values())
 +        {
 +            differences.addAll(MerkleTree.difference(tree, 
rtree.getMerkleTree(tree.fullRange)));
 +        }
 +        return differences;
 +    }
 +
 +    public static class MerkleTreesSerializer implements 
IVersionedSerializer<MerkleTrees>
 +    {
 +        public void serialize(MerkleTrees trees, DataOutputPlus out, int 
version) throws IOException
 +        {
 +            out.writeInt(trees.merkleTrees.size());
 +            for (MerkleTree tree : trees.merkleTrees.values())
 +            {
 +                MerkleTree.serializer.serialize(tree, out, version);
 +            }
 +        }
 +
 +        public MerkleTrees deserialize(DataInputPlus in, int version) throws 
IOException
 +        {
 +            IPartitioner partitioner = null;
 +            int nTrees = in.readInt();
 +            Collection<MerkleTree> trees = new ArrayList<>(nTrees);
 +            if (nTrees > 0)
 +            {
 +                for (int i = 0; i < nTrees; i++)
 +                {
 +                    MerkleTree tree = MerkleTree.serializer.deserialize(in, 
version);
 +                    trees.add(tree);
 +
 +                    if (partitioner == null)
 +                        partitioner = tree.partitioner();
 +                    else
 +                        assert tree.partitioner() == partitioner;
 +                }
 +            }
 +
 +            return new MerkleTrees(partitioner, trees);
 +        }
 +
 +        public long serializedSize(MerkleTrees trees, int version)
 +        {
 +            assert trees != null;
 +
 +            long size = TypeSizes.sizeof(trees.merkleTrees.size());
 +            for (MerkleTree tree : trees.merkleTrees.values())
 +            {
 +                size += MerkleTree.serializer.serializedSize(tree, version);
 +            }
 +            return size;
 +        }
 +
 +    }
 +
 +    private static class TokenRangeComparator implements 
Comparator<Range<Token>>
 +    {
 +        @Override
 +        public int compare(Range<Token> rt1, Range<Token> rt2)
 +        {
 +            if (rt1.left.compareTo(rt2.left) == 0)
 +                return 0;
 +
 +            return rt1.compareTo(rt2);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 198b01b,471f8cf..0ce81d3
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@@ -115,10 -125,9 +115,10 @@@ public class CompactionsTes
          return store;
      }
  
-     private long populate(String ks, String cf, int startRowKey, int 
endRowKey, int ttl)
+     public static long populate(String ks, String cf, int startRowKey, int 
endRowKey, int ttl)
      {
          long timestamp = System.currentTimeMillis();
 +        CFMetaData cfm = Keyspace.open(ks).getColumnFamilyStore(cf).metadata;
          for (int i = startRowKey; i <= endRowKey; i++)
          {
              DecoratedKey key = Util.dk(Integer.toString(i));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/repair/ValidatorTest.java
index 14f5707,61ab3da..9c32cef
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@@ -15,13 -15,22 +15,23 @@@
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
- 
  package org.apache.cassandra.repair;
  
 -import java.io.IOException;
  import java.net.InetAddress;
 -import java.security.MessageDigest;
 +import java.util.Arrays;
+ import java.util.Collections;
++import java.util.Iterator;
++import java.util.Map;
  import java.util.UUID;
++import java.util.concurrent.CompletableFuture;
+ import java.util.concurrent.TimeUnit;
+ 
+ import com.google.common.util.concurrent.ListenableFuture;
+ import com.google.common.util.concurrent.SettableFuture;
  
+ import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.db.compaction.CompactionsTest;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import org.apache.cassandra.io.util.SequentialWriter;
  import org.junit.After;
  import org.junit.BeforeClass;
  import org.junit.Test;
@@@ -39,24 -51,26 +49,29 @@@ import org.apache.cassandra.net.IMessag
  import org.apache.cassandra.net.MessageIn;
  import org.apache.cassandra.net.MessageOut;
  import org.apache.cassandra.net.MessagingService;
 -import org.apache.cassandra.net.IMessageSink;
  import org.apache.cassandra.repair.messages.RepairMessage;
  import org.apache.cassandra.repair.messages.ValidationComplete;
 +import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.service.ActiveRepairService;
 -import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.utils.ByteBufferUtil;
 -import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.MerkleTree;
 +import org.apache.cassandra.utils.MerkleTrees;
- import org.apache.cassandra.utils.concurrent.SimpleCondition;
++import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.UUIDGen;
 -import org.apache.cassandra.utils.concurrent.SimpleCondition;
  
 -import static org.junit.Assert.*;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertNull;
 +import static org.junit.Assert.assertTrue;
  
  public class ValidatorTest
  {
+     private static final long TEST_TIMEOUT = 60; //seconds
+ 
      private static final String keyspace = "ValidatorTest";
      private static final String columnFamily = "Standard1";
 -    private final IPartitioner partitioner = StorageService.getPartitioner();
 +    private static IPartitioner partitioner;
  
      @BeforeClass
      public static void defineSchema() throws Exception
@@@ -78,36 -92,9 +93,9 @@@
      public void testValidatorComplete() throws Throwable
      {
          Range<Token> range = new Range<>(partitioner.getMinimumToken(), 
partitioner.getRandomToken());
 -        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), 
UUID.randomUUID(), keyspace, columnFamily, range);
 +        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), 
UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range));
  
-         final SimpleCondition lock = new SimpleCondition();
-         MessagingService.instance().addMessageSink(new IMessageSink()
-         {
-             public boolean allowOutgoingMessage(MessageOut message, int id, 
InetAddress to)
-             {
-                 try
-                 {
-                     if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
-                     {
-                         RepairMessage m = (RepairMessage) message.payload;
-                         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, 
m.messageType);
-                         assertEquals(desc, m.desc);
-                         assertTrue(((ValidationComplete) m).success());
-                         assertNotNull(((ValidationComplete) m).trees);
-                     }
-                 }
-                 finally
-                 {
-                     lock.signalAll();
-                 }
-                 return false;
-             }
- 
-             public boolean allowIncomingMessage(MessageIn message, int id)
-             {
-                 return false;
-             }
-         });
 -        final ListenableFuture<MessageOut> outgoingMessageSink = 
registerOutgoingMessageSink();
++        final CompletableFuture<MessageOut> outgoingMessageSink = 
registerOutgoingMessageSink();
  
          InetAddress remote = InetAddress.getByName("127.0.0.2");
  
@@@ -130,37 -116,128 +118,111 @@@
          Token min = tree.partitioner().getMinimumToken();
          assertNotNull(tree.hash(new Range<>(min, min)));
  
-         if (!lock.isSignaled())
-             lock.await();
+         MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, 
TimeUnit.SECONDS);
+         assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+         RepairMessage m = (RepairMessage) message.payload;
+         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+         assertEquals(desc, m.desc);
 -        assertTrue(((ValidationComplete) m).success);
 -        assertNotNull(((ValidationComplete) m).tree);
++        assertTrue(((ValidationComplete) m).success());
++        assertNotNull(((ValidationComplete) m).trees);
      }
  
 -    private static class CompactedRowStub extends AbstractCompactedRow
 -    {
 -        private CompactedRowStub(DecoratedKey key)
 -        {
 -            super(key);
 -        }
 -
 -        public RowIndexEntry write(long currentPosition, SequentialWriter 
out) throws IOException
 -        {
 -            throw new UnsupportedOperationException();
 -        }
 -
 -        public void update(MessageDigest digest) { }
 -
 -        public ColumnStats columnStats()
 -        {
 -            throw new UnsupportedOperationException();
 -        }
 -
 -        public void close() throws IOException { }
 -    }
  
      @Test
      public void testValidatorFailed() throws Throwable
      {
          Range<Token> range = new Range<>(partitioner.getMinimumToken(), 
partitioner.getRandomToken());
 -        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), 
UUID.randomUUID(), keyspace, columnFamily, range);
 +        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), 
UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range));
  
-         final SimpleCondition lock = new SimpleCondition();
 -        final ListenableFuture<MessageOut> outgoingMessageSink = 
registerOutgoingMessageSink();
++        final CompletableFuture<MessageOut> outgoingMessageSink = 
registerOutgoingMessageSink();
+ 
+         InetAddress remote = InetAddress.getByName("127.0.0.2");
+ 
+         Validator validator = new Validator(desc, remote, 0);
+         validator.fail();
+ 
+         MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, 
TimeUnit.SECONDS);
+         assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+         RepairMessage m = (RepairMessage) message.payload;
+         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+         assertEquals(desc, m.desc);
 -        assertFalse(((ValidationComplete) m).success);
 -        assertNull(((ValidationComplete) m).tree);
++        assertFalse(((ValidationComplete) m).success());
++        assertNull(((ValidationComplete) m).trees);
+     }
+ 
+     @Test
+     public void simpleValidationTest128() throws Exception
+     {
+         simpleValidationTest(128);
+     }
+ 
+     @Test
+     public void simpleValidationTest1500() throws Exception
+     {
+         simpleValidationTest(1500);
+     }
+ 
+     /**
+      * Test for CASSANDRA-5263
+      * 1. Create N rows
+      * 2. Run validation compaction
+      * 3. Expect merkle tree with size 2^(log2(n))
+      */
+     public void simpleValidationTest(int n) throws Exception
+     {
+         Keyspace ks = Keyspace.open(keyspace);
+         ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily);
+         cfs.clearUnsafe();
+ 
+         // disable compaction while flushing
+         cfs.disableAutoCompaction();
+ 
+         CompactionsTest.populate(keyspace, columnFamily, 0, n, 0); //ttl=3s
+ 
+         cfs.forceBlockingFlush();
 -        assertEquals(1, cfs.getSSTables().size());
++        assertEquals(1, cfs.getLiveSSTables().size());
+ 
+         // wait enough to force single compaction
+         TimeUnit.SECONDS.sleep(5);
+ 
 -        SSTableReader sstable = cfs.getSSTables().iterator().next();
++        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
+         UUID repairSessionId = UUIDGen.getTimeUUID();
+         final RepairJobDesc desc = new RepairJobDesc(repairSessionId, 
UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
 -                                               cfs.getColumnFamilyName(), new 
Range<Token>(sstable.first.getToken(),
 -                                                                              
               sstable.last.getToken()));
++                                               cfs.getColumnFamilyName(), 
Collections.singletonList(new Range<>(sstable.first.getToken(),
++                                                                              
                                  sstable.last.getToken())));
+ 
+         
ActiveRepairService.instance.registerParentRepairSession(repairSessionId, 
FBUtilities.getBroadcastAddress(),
 -                                                                 
Collections.singletonList(cfs), Collections.singleton(desc.range),
 -                                                                 false, 
false);
++                                                                 
Collections.singletonList(cfs), desc.ranges, false, 
ActiveRepairService.UNREPAIRED_SSTABLE,
++                                                                 false);
+ 
 -        final ListenableFuture<MessageOut> outgoingMessageSink = 
registerOutgoingMessageSink();
++        final CompletableFuture<MessageOut> outgoingMessageSink = 
registerOutgoingMessageSink();
+         Validator validator = new Validator(desc, 
FBUtilities.getBroadcastAddress(), 0, true);
+         CompactionManager.instance.submitValidation(cfs, validator);
+ 
+         MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, 
TimeUnit.SECONDS);
+         assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb);
+         RepairMessage m = (RepairMessage) message.payload;
+         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+         assertEquals(desc, m.desc);
 -        assertTrue(((ValidationComplete) m).success);
 -        MerkleTree tree = ((ValidationComplete) m).tree;
++        assertTrue(((ValidationComplete) m).success());
++        MerkleTrees trees = ((ValidationComplete) m).trees;
+ 
 -        assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), 
tree.size(), 0.0);
 -        assertEquals(tree.rowCount(), n);
++        Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator = 
trees.iterator();
++        while (iterator.hasNext())
++        {
++            assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), 
iterator.next().getValue().size(), 0.0);
++        }
++        assertEquals(trees.rowCount(), n);
+     }
+ 
 -    private ListenableFuture<MessageOut> registerOutgoingMessageSink()
++    private CompletableFuture<MessageOut> registerOutgoingMessageSink()
+     {
 -        final SettableFuture<MessageOut> future = SettableFuture.create();
++        final CompletableFuture<MessageOut> future = new 
CompletableFuture<>();
          MessagingService.instance().addMessageSink(new IMessageSink()
          {
              public boolean allowOutgoingMessage(MessageOut message, int id, 
InetAddress to)
              {
-                 try
-                 {
-                     if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
-                     {
-                         RepairMessage m = (RepairMessage) message.payload;
-                         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, 
m.messageType);
-                         assertEquals(desc, m.desc);
-                         assertFalse(((ValidationComplete) m).success());
-                         assertNull(((ValidationComplete) m).trees);
-                     }
-                 }
-                 finally
-                 {
-                     lock.signalAll();
-                 }
 -                future.set(message);
++                future.complete(message);
                  return false;
              }
  

Reply via email to