Merge branch 'cassandra-3.11' into trunk

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/39bcdcd3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/39bcdcd3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/39bcdcd3

Branch: refs/heads/trunk
Commit: 39bcdcd32ea5dcfd372d4b3eaf10c1c3de0547fb
Parents: 8b7e967 d2248f2
Author: Aleksey Yeshchenko <alek...@apple.com>
Authored: Fri Mar 23 14:49:51 2018 +0000
Committer: Aleksey Yeshchenko <alek...@apple.com>
Committed: Fri Mar 23 14:49:51 2018 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 ++-
 .../org/apache/cassandra/db/ReadResponse.java   | 22 +++++++++++++++-
 .../db/rows/RangeTombstoneBoundMarker.java      |  2 +-
 .../db/rows/RangeTombstoneBoundaryMarker.java   |  5 +++-
 .../reads/repair/RowIteratorMergeListener.java  | 27 +++++++++++++++-----
 5 files changed, 49 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/39bcdcd3/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d4e5e37,987b9f7..76803a8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,213 -1,3 +1,212 @@@
 +4.0
 + * Fix scheduling of speculative retry threshold recalculation 
(CASSANDRA-14338)
 + * Add support for hybrid MIN(), MAX() speculative retry policies 
(CASSANDRA-14293)
 + * Correct and clarify SSLFactory.getSslContext method and call sites 
(CASSANDRA-14314)
-  * Use zero as default score in DynamicEndpointSnitch (CASSANDRA-14252)
 + * Handle static and partition deletion properly on 
ThrottledUnfilteredIterator (CASSANDRA-14315)
 + * NodeTool clientstats should show SSL Cipher (CASSANDRA-14322)
 + * Add ability to specify driver name and version (CASSANDRA-14275)
 + * Abstract streaming for pluggable storage (CASSANDRA-14115)
 + * Forced incremental repairs should promote sstables if they can 
(CASSANDRA-14294)
 + * Use Murmur3 for validation compactions (CASSANDRA-14002)
 + * Comma at the end of the seed list is interpretated as localhost 
(CASSANDRA-14285)
 + * Refactor read executor and response resolver, abstract read repair 
(CASSANDRA-14058)
 + * Add optional startup delay to wait until peers are ready (CASSANDRA-13993)
 + * Add a few options to nodetool verify (CASSANDRA-14201)
 + * CVE-2017-5929 Security vulnerability and redefine default log rotation 
policy (CASSANDRA-14183)
 + * Use JVM default SSL validation algorithm instead of custom default 
(CASSANDRA-13259)
 + * Better document in code InetAddressAndPort usage post 7544, incorporate 
port into UUIDGen node (CASSANDRA-14226)
 + * Fix sstablemetadata date string for minLocalDeletionTime (CASSANDRA-14132)
 + * Make it possible to change neverPurgeTombstones during runtime 
(CASSANDRA-14214)
 + * Remove GossipDigestSynVerbHandler#doSort() (CASSANDRA-14174)
 + * Add nodetool clientlist (CASSANDRA-13665)
 + * Revert ProtocolVersion changes from CASSANDRA-7544 (CASSANDRA-14211)
 + * Non-disruptive seed node list reload (CASSANDRA-14190)
 + * Nodetool tablehistograms to print statics for all the tables 
(CASSANDRA-14185)
 + * Migrate dtests to use pytest and python3 (CASSANDRA-14134)
 + * Allow storage port to be configurable per node (CASSANDRA-7544)
 + * Make sub-range selection for non-frozen collections return null instead of 
empty (CASSANDRA-14182)
 + * BloomFilter serialization format should not change byte ordering 
(CASSANDRA-9067)
 + * Remove unused on-heap BloomFilter implementation (CASSANDRA-14152)
 + * Delete temp test files on exit (CASSANDRA-14153)
 + * Make PartitionUpdate and Mutation immutable (CASSANDRA-13867)
 + * Fix CommitLogReplayer exception for CDC data (CASSANDRA-14066)
 + * Fix cassandra-stress startup failure (CASSANDRA-14106)
 + * Remove initialDirectories from CFS (CASSANDRA-13928)
 + * Fix trivial log format error (CASSANDRA-14015)
 + * Allow sstabledump to do a json object per partition (CASSANDRA-13848)
 + * Add option to optimise merkle tree comparison across replicas 
(CASSANDRA-3200)
 + * Remove unused and deprecated methods from AbstractCompactionStrategy 
(CASSANDRA-14081)
 + * Fix Distribution.average in cassandra-stress (CASSANDRA-14090)
 + * Support a means of logging all queries as they were invoked 
(CASSANDRA-13983)
 + * Presize collections (CASSANDRA-13760)
 + * Add GroupCommitLogService (CASSANDRA-13530)
 + * Parallelize initial materialized view build (CASSANDRA-12245)
 + * Fix flaky SecondaryIndexManagerTest.assert[Not]MarkedAsBuilt 
(CASSANDRA-13965)
 + * Make LWTs send resultset metadata on every request (CASSANDRA-13992)
 + * Fix flaky indexWithFailedInitializationIsNotQueryableAfterPartialRebuild 
(CASSANDRA-13963)
 + * Introduce leaf-only iterator (CASSANDRA-9988)
 + * Upgrade Guava to 23.3 and Airline to 0.8 (CASSANDRA-13997)
 + * Allow only one concurrent call to StatusLogger (CASSANDRA-12182)
 + * Refactoring to specialised functional interfaces (CASSANDRA-13982)
 + * Speculative retry should allow more friendly params (CASSANDRA-13876)
 + * Throw exception if we send/receive repair messages to incompatible nodes 
(CASSANDRA-13944)
 + * Replace usages of MessageDigest with Guava's Hasher (CASSANDRA-13291)
 + * Add nodetool cmd to print hinted handoff window (CASSANDRA-13728)
 + * Fix some alerts raised by static analysis (CASSANDRA-13799)
 + * Checksum sstable metadata (CASSANDRA-13321, CASSANDRA-13593)
 + * Add result set metadata to prepared statement MD5 hash calculation 
(CASSANDRA-10786)
 + * Refactor GcCompactionTest to avoid boxing (CASSANDRA-13941)
 + * Expose recent histograms in JmxHistograms (CASSANDRA-13642)
 + * Fix buffer length comparison when decompressing in netty-based streaming 
(CASSANDRA-13899)
 + * Properly close StreamCompressionInputStream to release any ByteBuf 
(CASSANDRA-13906)
 + * Add SERIAL and LOCAL_SERIAL support for cassandra-stress (CASSANDRA-13925)
 + * LCS needlessly checks for L0 STCS candidates multiple times 
(CASSANDRA-12961)
 + * Correctly close netty channels when a stream session ends (CASSANDRA-13905)
 + * Update lz4 to 1.4.0 (CASSANDRA-13741)
 + * Optimize Paxos prepare and propose stage for local requests 
(CASSANDRA-13862)
 + * Throttle base partitions during MV repair streaming to prevent OOM 
(CASSANDRA-13299)
 + * Use compaction threshold for STCS in L0 (CASSANDRA-13861)
 + * Fix problem with min_compress_ratio: 1 and disallow ratio < 1 
(CASSANDRA-13703)
 + * Add extra information to SASI timeout exception (CASSANDRA-13677)
 + * Add incremental repair support for --hosts, --force, and subrange repair 
(CASSANDRA-13818)
 + * Rework CompactionStrategyManager.getScanners synchronization 
(CASSANDRA-13786)
 + * Add additional unit tests for batch behavior, TTLs, Timestamps 
(CASSANDRA-13846)
 + * Add keyspace and table name in schema validation exception 
(CASSANDRA-13845)
 + * Emit metrics whenever we hit tombstone failures and warn thresholds 
(CASSANDRA-13771)
 + * Make netty EventLoopGroups daemon threads (CASSANDRA-13837)
 + * Race condition when closing stream sessions (CASSANDRA-13852)
 + * NettyFactoryTest is failing in trunk on macOS (CASSANDRA-13831)
 + * Allow changing log levels via nodetool for related classes 
(CASSANDRA-12696)
 + * Add stress profile yaml with LWT (CASSANDRA-7960)
 + * Reduce memory copies and object creations when acting on ByteBufs 
(CASSANDRA-13789)
 + * Simplify mx4j configuration (Cassandra-13578)
 + * Fix trigger example on 4.0 (CASSANDRA-13796)
 + * Force minumum timeout value (CASSANDRA-9375)
 + * Use netty for streaming (CASSANDRA-12229)
 + * Use netty for internode messaging (CASSANDRA-8457)
 + * Add bytes repaired/unrepaired to nodetool tablestats (CASSANDRA-13774)
 + * Don't delete incremental repair sessions if they still have sstables 
(CASSANDRA-13758)
 + * Fix pending repair manager index out of bounds check (CASSANDRA-13769)
 + * Don't use RangeFetchMapCalculator when RF=1 (CASSANDRA-13576)
 + * Don't optimise trivial ranges in RangeFetchMapCalculator (CASSANDRA-13664)
 + * Use an ExecutorService for repair commands instead of new 
Thread(..).start() (CASSANDRA-13594)
 + * Fix race / ref leak in anticompaction (CASSANDRA-13688)
 + * Expose tasks queue length via JMX (CASSANDRA-12758)
 + * Fix race / ref leak in PendingRepairManager (CASSANDRA-13751)
 + * Enable ppc64le runtime as unsupported architecture (CASSANDRA-13615)
 + * Improve sstablemetadata output (CASSANDRA-11483)
 + * Support for migrating legacy users to roles has been dropped 
(CASSANDRA-13371)
 + * Introduce error metrics for repair (CASSANDRA-13387)
 + * Refactoring to primitive functional interfaces in AuthCache 
(CASSANDRA-13732)
 + * Update metrics to 3.1.5 (CASSANDRA-13648)
 + * batch_size_warn_threshold_in_kb can now be set at runtime (CASSANDRA-13699)
 + * Avoid always rebuilding secondary indexes at startup (CASSANDRA-13725)
 + * Upgrade JMH from 1.13 to 1.19 (CASSANDRA-13727)
 + * Upgrade SLF4J from 1.7.7 to 1.7.25 (CASSANDRA-12996)
 + * Default for start_native_transport now true if not set in config 
(CASSANDRA-13656)
 + * Don't add localhost to the graph when calculating where to stream from 
(CASSANDRA-13583)
 + * Make CDC availability more deterministic via hard-linking (CASSANDRA-12148)
 + * Allow skipping equality-restricted clustering columns in ORDER BY clause 
(CASSANDRA-10271)
 + * Use common nowInSec for validation compactions (CASSANDRA-13671)
 + * Improve handling of IR prepare failures (CASSANDRA-13672)
 + * Send IR coordinator messages synchronously (CASSANDRA-13673)
 + * Flush system.repair table before IR finalize promise (CASSANDRA-13660)
 + * Fix column filter creation for wildcard queries (CASSANDRA-13650)
 + * Add 'nodetool getbatchlogreplaythrottle' and 'nodetool 
setbatchlogreplaythrottle' (CASSANDRA-13614)
 + * fix race condition in PendingRepairManager (CASSANDRA-13659)
 + * Allow noop incremental repair state transitions (CASSANDRA-13658)
 + * Run repair with down replicas (CASSANDRA-10446)
 + * Added started & completed repair metrics (CASSANDRA-13598)
 + * Added started & completed repair metrics (CASSANDRA-13598)
 + * Improve secondary index (re)build failure and concurrency handling 
(CASSANDRA-10130)
 + * Improve calculation of available disk space for compaction 
(CASSANDRA-13068)
 + * Change the accessibility of RowCacheSerializer for third party row cache 
plugins (CASSANDRA-13579)
 + * Allow sub-range repairs for a preview of repaired data (CASSANDRA-13570)
 + * NPE in IR cleanup when columnfamily has no sstables (CASSANDRA-13585)
 + * Fix Randomness of stress values (CASSANDRA-12744)
 + * Allow selecting Map values and Set elements (CASSANDRA-7396)
 + * Fast and garbage-free Streaming Histogram (CASSANDRA-13444)
 + * Update repairTime for keyspaces on completion (CASSANDRA-13539)
 + * Add configurable upper bound for validation executor threads 
(CASSANDRA-13521)
 + * Bring back maxHintTTL propery (CASSANDRA-12982)
 + * Add testing guidelines (CASSANDRA-13497)
 + * Add more repair metrics (CASSANDRA-13531)
 + * RangeStreamer should be smarter when picking endpoints for streaming 
(CASSANDRA-4650)
 + * Avoid rewrapping an exception thrown for cache load functions 
(CASSANDRA-13367)
 + * Log time elapsed for each incremental repair phase (CASSANDRA-13498)
 + * Add multiple table operation support to cassandra-stress (CASSANDRA-8780)
 + * Fix incorrect cqlsh results when selecting same columns multiple times 
(CASSANDRA-13262)
 + * Fix WriteResponseHandlerTest is sensitive to test execution order 
(CASSANDRA-13421)
 + * Improve incremental repair logging (CASSANDRA-13468)
 + * Start compaction when incremental repair finishes (CASSANDRA-13454)
 + * Add repair streaming preview (CASSANDRA-13257)
 + * Cleanup isIncremental/repairedAt usage (CASSANDRA-13430)
 + * Change protocol to allow sending key space independent of query string 
(CASSANDRA-10145)
 + * Make gc_log and gc_warn settable at runtime (CASSANDRA-12661)
 + * Take number of files in L0 in account when estimating remaining compaction 
tasks (CASSANDRA-13354)
 + * Skip building views during base table streams on range movements 
(CASSANDRA-13065)
 + * Improve error messages for +/- operations on maps and tuples 
(CASSANDRA-13197)
 + * Remove deprecated repair JMX APIs (CASSANDRA-11530)
 + * Fix version check to enable streaming keep-alive (CASSANDRA-12929)
 + * Make it possible to monitor an ideal consistency level separate from 
actual consistency level (CASSANDRA-13289)
 + * Outbound TCP connections ignore internode authenticator (CASSANDRA-13324)
 + * Upgrade junit from 4.6 to 4.12 (CASSANDRA-13360)
 + * Cleanup ParentRepairSession after repairs (CASSANDRA-13359)
 + * Upgrade snappy-java to 1.1.2.6 (CASSANDRA-13336)
 + * Incremental repair not streaming correct sstables (CASSANDRA-13328)
 + * Upgrade the jna version to 4.3.0 (CASSANDRA-13300)
 + * Add the currentTimestamp, currentDate, currentTime and currentTimeUUID 
functions (CASSANDRA-13132)
 + * Remove config option index_interval (CASSANDRA-10671)
 + * Reduce lock contention for collection types and serializers 
(CASSANDRA-13271)
 + * Make it possible to override MessagingService.Verb ids (CASSANDRA-13283)
 + * Avoid synchronized on prepareForRepair in ActiveRepairService 
(CASSANDRA-9292)
 + * Adds the ability to use uncompressed chunks in compressed files 
(CASSANDRA-10520)
 + * Don't flush sstables when streaming for incremental repair 
(CASSANDRA-13226)
 + * Remove unused method (CASSANDRA-13227)
 + * Fix minor bugs related to #9143 (CASSANDRA-13217)
 + * Output warning if user increases RF (CASSANDRA-13079)
 + * Remove pre-3.0 streaming compatibility code for 4.0 (CASSANDRA-13081)
 + * Add support for + and - operations on dates (CASSANDRA-11936)
 + * Fix consistency of incrementally repaired data (CASSANDRA-9143)
 + * Increase commitlog version (CASSANDRA-13161)
 + * Make TableMetadata immutable, optimize Schema (CASSANDRA-9425)
 + * Refactor ColumnCondition (CASSANDRA-12981)
 + * Parallelize streaming of different keyspaces (CASSANDRA-4663)
 + * Improved compactions metrics (CASSANDRA-13015)
 + * Speed-up start-up sequence by avoiding un-needed flushes (CASSANDRA-13031)
 + * Use Caffeine (W-TinyLFU) for on-heap caches (CASSANDRA-10855)
 + * Thrift removal (CASSANDRA-11115)
 + * Remove pre-3.0 compatibility code for 4.0 (CASSANDRA-12716)
 + * Add column definition kind to dropped columns in schema (CASSANDRA-12705)
 + * Add (automate) Nodetool Documentation (CASSANDRA-12672)
 + * Update bundled cqlsh python driver to 3.7.0 (CASSANDRA-12736)
 + * Reject invalid replication settings when creating or altering a keyspace 
(CASSANDRA-12681)
 + * Clean up the SSTableReader#getScanner API wrt removal of RateLimiter 
(CASSANDRA-12422)
 + * Use new token allocation for non bootstrap case as well (CASSANDRA-13080)
 + * Avoid byte-array copy when key cache is disabled (CASSANDRA-13084)
 + * Require forceful decommission if number of nodes is less than replication 
factor (CASSANDRA-12510)
 + * Allow IN restrictions on column families with collections (CASSANDRA-12654)
 + * Log message size in trace message in OutboundTcpConnection 
(CASSANDRA-13028)
 + * Add timeUnit Days for cassandra-stress (CASSANDRA-13029)
 + * Add mutation size and batch metrics (CASSANDRA-12649)
 + * Add method to get size of endpoints to TokenMetadata (CASSANDRA-12999)
 + * Expose time spent waiting in thread pool queue (CASSANDRA-8398)
 + * Conditionally update index built status to avoid unnecessary flushes 
(CASSANDRA-12969)
 + * cqlsh auto completion: refactor definition of compaction strategy options 
(CASSANDRA-12946)
 + * Add support for arithmetic operators (CASSANDRA-11935)
 + * Add histogram for delay to deliver hints (CASSANDRA-13234)
 + * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307)
 + * Changing `max_hint_window_in_ms` at runtime (CASSANDRA-11720)
 + * Trivial format error in StorageProxy (CASSANDRA-13551)
 + * Nodetool repair can hang forever if we lose the notification for the 
repair completing/failing (CASSANDRA-13480)
 + * Anticompaction can cause noisy log messages (CASSANDRA-13684)
 + * Switch to client init for sstabledump (CASSANDRA-13683)
 + * CQLSH: Don't pause when capturing data (CASSANDRA-13743)
 + * nodetool clearsnapshot requires --all to clear all snapshots 
(CASSANDRA-13391)
 + * Correctly count range tombstones in traces and tombstone thresholds 
(CASSANDRA-8527)
 + * cqlshrc.sample uses incorrect option for time formatting (CASSANDRA-14243)
 +
 +
  3.11.3
   * SASI tokenizer for simple delimiter based entries (CASSANDRA-14247)
   * Fix Loss of digits when doing CAST from varint/bigint to decimal 
(CASSANDRA-14170)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39bcdcd3/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ReadResponse.java
index 2c06f65,7aa915e..486980d
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@@ -29,11 -36,12 +29,12 @@@ import org.apache.cassandra.db.rows.*
  import org.apache.cassandra.io.IVersionedSerializer;
  import org.apache.cassandra.io.util.DataInputBuffer;
  import org.apache.cassandra.io.util.DataInputPlus;
 -import org.apache.cassandra.io.util.DataOutputPlus;
  import org.apache.cassandra.io.util.DataOutputBuffer;
 +import org.apache.cassandra.io.util.DataOutputPlus;
  import org.apache.cassandra.net.MessagingService;
 -import org.apache.cassandra.thrift.ThriftResultsMerger;
++import org.apache.cassandra.schema.TableMetadata;
  import org.apache.cassandra.utils.ByteBufferUtil;
 -import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.HashingUtils;
  
  public abstract class ReadResponse
  {
@@@ -87,11 -114,31 +88,30 @@@
          return "<key " + key + " not found>";
      }
  
 -    private String toDebugString(UnfilteredRowIterator partition, CFMetaData 
metadata)
++    private String toDebugString(UnfilteredRowIterator partition, 
TableMetadata metadata)
+     {
+         StringBuilder sb = new StringBuilder();
+ 
 -        sb.append(String.format("[%s.%s] key=%s partition_deletion=%s 
columns=%s",
 -                                metadata.ksName,
 -                                metadata.cfName,
 -                                
metadata.getKeyValidator().getString(partition.partitionKey().getKey()),
++        sb.append(String.format("[%s] key=%s partition_deletion=%s 
columns=%s",
++                                metadata,
++                                
metadata.partitionKeyType.getString(partition.partitionKey().getKey()),
+                                 partition.partitionLevelDeletion(),
+                                 partition.columns()));
+ 
+         if (partition.staticRow() != Rows.EMPTY_STATIC_ROW)
+             sb.append("\n    
").append(partition.staticRow().toString(metadata, true));
+ 
+         while (partition.hasNext())
+             sb.append("\n    ").append(partition.next().toString(metadata, 
true));
+ 
+         return sb.toString();
+     }
+ 
      protected static ByteBuffer makeDigest(UnfilteredPartitionIterator 
iterator, ReadCommand command)
      {
 -        MessageDigest digest = FBUtilities.threadLocalMD5Digest();
 -        UnfilteredPartitionIterators.digest(command, iterator, digest, 
command.digestVersion());
 -        return ByteBuffer.wrap(digest.digest());
 +        Hasher hasher = HashingUtils.CURRENT_HASH_FUNCTION.newHasher();
 +        UnfilteredPartitionIterators.digest(iterator, hasher, 
command.digestVersion());
 +        return ByteBuffer.wrap(hasher.hash().asBytes());
      }
  
      private static class DigestResponse extends ReadResponse

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39bcdcd3/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
index a1f2b2c,f6ba149..c0c6afd
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
@@@ -127,15 -127,21 +127,15 @@@ public class RangeTombstoneBoundMarker 
          return new RangeTombstoneBoundMarker(clustering(), newDeletionTime);
      }
  
 -    public void digest(MessageDigest digest)
 -    {
 -        bound.digest(digest);
 -        deletion.digest(digest);
 -    }
 -
 -    @Override
 -    public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
 +    public void digest(Hasher hasher)
      {
 -        digest(digest);
 +        bound.digest(hasher);
 +        deletion.digest(hasher);
      }
  
 -    public String toString(CFMetaData metadata)
 +    public String toString(TableMetadata metadata)
      {
-         return "Marker " + bound.toString(metadata) + '@' + 
deletion.markedForDeleteAt();
+         return String.format("Marker %s@%d/%d", bound.toString(metadata), 
deletion.markedForDeleteAt(), deletion.localDeletionTime());
      }
  
      @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39bcdcd3/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
index 60b7dab,f183309..6e6c8cd
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
@@@ -145,16 -145,25 +145,19 @@@ public class RangeTombstoneBoundaryMark
          return new RangeTombstoneBoundMarker(openBound(reversed), 
startDeletion);
      }
  
 -    public void digest(MessageDigest digest)
 -    {
 -        bound.digest(digest);
 -        endDeletion.digest(digest);
 -        startDeletion.digest(digest);
 -    }
 -
 -    @Override
 -    public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
 +    public void digest(Hasher hasher)
      {
 -        digest(digest);
 +        bound.digest(hasher);
 +        endDeletion.digest(hasher);
 +        startDeletion.digest(hasher);
      }
  
 -    public String toString(CFMetaData metadata)
 +    public String toString(TableMetadata metadata)
      {
-         return String.format("Marker %s@%d-%d", bound.toString(metadata), 
endDeletion.markedForDeleteAt(), startDeletion.markedForDeleteAt());
+         return String.format("Marker %s@%d/%d-%d/%d",
+                              bound.toString(metadata),
+                              endDeletion.markedForDeleteAt(), 
endDeletion.localDeletionTime(),
+                              startDeletion.markedForDeleteAt(), 
startDeletion.localDeletionTime());
      }
  
      @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39bcdcd3/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
index 63bd3ce,0000000..44b6eeb
mode 100644,000000..100644
--- 
a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
+++ 
b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
@@@ -1,336 -1,0 +1,351 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.service.reads.repair;
 +
 +import java.util.Arrays;
 +
 +import com.google.common.base.Joiner;
 +import com.google.common.collect.Iterables;
 +
 +import org.apache.cassandra.db.Clustering;
 +import org.apache.cassandra.db.ClusteringBound;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.DeletionTime;
 +import org.apache.cassandra.db.LivenessInfo;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.RangeTombstone;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.RegularAndStaticColumns;
 +import org.apache.cassandra.db.Slice;
 +import org.apache.cassandra.db.filter.ColumnFilter;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.BTreeRow;
 +import org.apache.cassandra.db.rows.Cell;
 +import org.apache.cassandra.db.rows.RangeTombstoneMarker;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.db.rows.RowDiffListener;
 +import org.apache.cassandra.db.rows.Rows;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.schema.ColumnMetadata;
 +import org.apache.cassandra.schema.TableMetadata;
 +
 +public class RowIteratorMergeListener implements 
UnfilteredRowIterators.MergeListener
 +{
 +    private final DecoratedKey partitionKey;
 +    private final RegularAndStaticColumns columns;
 +    private final boolean isReversed;
 +    private final InetAddressAndPort[] sources;
 +    private final ReadCommand command;
 +
 +    private final PartitionUpdate.Builder[] repairs;
 +
 +    private final Row.Builder[] currentRows;
 +    private final RowDiffListener diffListener;
 +
 +    // The partition level deletion for the merge row.
 +    private DeletionTime partitionLevelDeletion;
 +    // When merged has a currently open marker, its time. null otherwise.
 +    private DeletionTime mergedDeletionTime;
 +    // For each source, the time of the current deletion as known by the 
source.
 +    private final DeletionTime[] sourceDeletionTime;
 +    // For each source, record if there is an open range to send as repair, 
and from where.
 +    private final ClusteringBound[] markerToRepair;
 +
 +    private final RepairListener repairListener;
 +
 +    public RowIteratorMergeListener(DecoratedKey partitionKey, 
RegularAndStaticColumns columns, boolean isReversed, InetAddressAndPort[] 
sources, ReadCommand command, RepairListener repairListener)
 +    {
 +        this.partitionKey = partitionKey;
 +        this.columns = columns;
 +        this.isReversed = isReversed;
 +        this.sources = sources;
 +        repairs = new PartitionUpdate.Builder[sources.length];
 +        currentRows = new Row.Builder[sources.length];
 +        sourceDeletionTime = new DeletionTime[sources.length];
 +        markerToRepair = new ClusteringBound[sources.length];
 +        this.command = command;
 +        this.repairListener = repairListener;
 +
 +        this.diffListener = new RowDiffListener()
 +        {
 +            public void onPrimaryKeyLivenessInfo(int i, Clustering 
clustering, LivenessInfo merged, LivenessInfo original)
 +            {
 +                if (merged != null && !merged.equals(original))
 +                    currentRow(i, 
clustering).addPrimaryKeyLivenessInfo(merged);
 +            }
 +
 +            public void onDeletion(int i, Clustering clustering, Row.Deletion 
merged, Row.Deletion original)
 +            {
 +                if (merged != null && !merged.equals(original))
 +                    currentRow(i, clustering).addRowDeletion(merged);
 +            }
 +
 +            public void onComplexDeletion(int i, Clustering clustering, 
ColumnMetadata column, DeletionTime merged, DeletionTime original)
 +            {
 +                if (merged != null && !merged.equals(original))
 +                    currentRow(i, clustering).addComplexDeletion(column, 
merged);
 +            }
 +
 +            public void onCell(int i, Clustering clustering, Cell merged, 
Cell original)
 +            {
 +                if (merged != null && !merged.equals(original) && 
isQueried(merged))
 +                    currentRow(i, clustering).addCell(merged);
 +            }
 +
 +            private boolean isQueried(Cell cell)
 +            {
 +                // When we read, we may have some cell that have been fetched 
but are not selected by the user. Those cells may
 +                // have empty values as optimization (see CASSANDRA-10655) 
and hence they should not be included in the read-repair.
 +                // This is fine since those columns are not actually 
requested by the user and are only present for the sake of CQL
 +                // semantic (making sure we can always distinguish between a 
row that doesn't exist from one that do exist but has
 +                /// no value for the column requested by the user) and so it 
won't be unexpected by the user that those columns are
 +                // not repaired.
 +                ColumnMetadata column = cell.column();
 +                ColumnFilter filter = 
RowIteratorMergeListener.this.command.columnFilter();
 +                return column.isComplex() ? 
filter.fetchedCellIsQueried(column, cell.path()) : 
filter.fetchedColumnIsQueried(column);
 +            }
 +        };
 +    }
 +
 +    private PartitionUpdate.Builder update(int i)
 +    {
 +        if (repairs[i] == null)
 +            repairs[i] = new PartitionUpdate.Builder(command.metadata(), 
partitionKey, columns, 1);
 +        return repairs[i];
 +    }
 +
 +    /**
 +     * The partition level deletion with with which source {@code i} is 
currently repaired, or
 +     * {@code DeletionTime.LIVE} if the source is not repaired on the 
partition level deletion (meaning it was
 +     * up to date on it). The output* of this method is only valid after the 
call to
 +     * {@link #onMergedPartitionLevelDeletion}.
 +     */
 +    private DeletionTime partitionLevelRepairDeletion(int i)
 +    {
 +        return repairs[i] == null ? DeletionTime.LIVE : 
repairs[i].partitionLevelDeletion();
 +    }
 +
 +    private Row.Builder currentRow(int i, Clustering clustering)
 +    {
 +        if (currentRows[i] == null)
 +        {
 +            currentRows[i] = BTreeRow.sortedBuilder();
 +            currentRows[i].newRow(clustering);
 +        }
 +        return currentRows[i];
 +    }
 +
 +    public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, 
DeletionTime[] versions)
 +    {
 +        this.partitionLevelDeletion = mergedDeletion;
 +        for (int i = 0; i < versions.length; i++)
 +        {
 +            if (mergedDeletion.supersedes(versions[i]))
 +                update(i).addPartitionDeletion(mergedDeletion);
 +        }
 +    }
 +
 +    public void onMergedRows(Row merged, Row[] versions)
 +    {
 +        // If a row was shadowed post merged, it must be by a partition level 
or range tombstone, and we handle
 +        // those case directly in their respective methods (in other words, 
it would be inefficient to send a row
 +        // deletion as repair when we know we've already send a partition 
level or range tombstone that covers it).
 +        if (merged.isEmpty())
 +            return;
 +
 +        Rows.diff(diffListener, merged, versions);
 +        for (int i = 0; i < currentRows.length; i++)
 +        {
 +            if (currentRows[i] != null)
 +                update(i).add(currentRows[i].build());
 +        }
 +        Arrays.fill(currentRows, null);
 +    }
 +
 +    private DeletionTime currentDeletion()
 +    {
 +        return mergedDeletionTime == null ? partitionLevelDeletion : 
mergedDeletionTime;
 +    }
 +
 +    public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, 
RangeTombstoneMarker[] versions)
 +    {
 +        try
 +        {
 +            // The code for merging range tombstones is a tad complex and we 
had the assertions there triggered
 +            // unexpectedly in a few occasions (CASSANDRA-13237, 
CASSANDRA-13719). It's hard to get insights
 +            // when that happen without more context that what the assertion 
errors give us however, hence the
 +            // catch here that basically gather as much as context as 
reasonable.
 +            internalOnMergedRangeTombstoneMarkers(merged, versions);
 +        }
 +        catch (AssertionError e)
 +        {
 +            // The following can be pretty verbose, but it's really only 
triggered if a bug happen, so we'd
 +            // rather get more info to debug than not.
 +            TableMetadata table = command.metadata();
-             String details = String.format("Error merging RTs on %s: 
merged=%s, versions=%s, sources={%s}",
++            String details = String.format("Error merging RTs on %s: 
command=%s, reversed=%b, merged=%s, versions=%s, sources={%s}",
 +                                           table,
++                                           command.toCQLString(),
++                                           isReversed,
 +                                           merged == null ? "null" : 
merged.toString(table),
 +                                           '[' + Joiner.on(", 
").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" 
: rt.toString(table))) + ']',
 +                                           Arrays.toString(sources));
 +            throw new AssertionError(details, e);
 +        }
 +    }
 +
 +    private void internalOnMergedRangeTombstoneMarkers(RangeTombstoneMarker 
merged, RangeTombstoneMarker[] versions)
 +    {
 +        // The current deletion as of dealing with this marker.
 +        DeletionTime currentDeletion = currentDeletion();
 +
 +        for (int i = 0; i < versions.length; i++)
 +        {
 +            RangeTombstoneMarker marker = versions[i];
 +
 +            // Update what the source now thinks is the current deletion
 +            if (marker != null)
 +                sourceDeletionTime[i] = marker.isOpen(isReversed) ? 
marker.openDeletionTime(isReversed) : null;
 +
 +            // If merged == null, some of the source is opening or closing a 
marker
 +            if (merged == null)
 +            {
 +                // but if it's not this source, move to the next one
 +                if (marker == null)
 +                    continue;
 +
 +                // We have a close and/or open marker for a source, with 
nothing corresponding in merged.
 +                // Because merged is a superset, this imply that we have a 
current deletion (being it due to an
 +                // early opening in merged or a partition level deletion) and 
that this deletion will still be
 +                // active after that point. Further whatever deletion was 
open or is open by this marker on the
 +                // source, that deletion cannot supersedes the current one.
 +                //
 +                // But while the marker deletion (before and/or after this 
point) cannot supersede the current
 +                // deletion, we want to know if it's equal to it (both before 
and after), because in that case
 +                // the source is up to date and we don't want to include 
repair.
 +                //
 +                // So in practice we have 2 possible case:
 +                //  1) the source was up-to-date on deletion up to that 
point: then it won't be from that point
 +                //     on unless it's a boundary and the new opened deletion 
time is also equal to the current
 +                //     deletion (note that this implies the boundary has the 
same closing and opening deletion
 +                //     time, which should generally not happen, but can due 
to legacy reading code not avoiding
 +                //     this for a while, see CASSANDRA-13237).
 +                //  2) the source wasn't up-to-date on deletion up to that 
point and it may now be (if it isn't
 +                //     we just have nothing to do for that marker).
 +                assert !currentDeletion.isLive() : currentDeletion.toString();
 +
 +                // Is the source up to date on deletion? It's up to date if 
it doesn't have an open RT repair
 +                // nor an "active" partition level deletion (where "active" 
means that it's greater or equal
 +                // to the current deletion: if the source has a repaired 
partition deletion lower than the
 +                // current deletion, this means the current deletion is due 
to a previously open range tombstone,
 +                // and if the source isn't currently repaired for that RT, 
then it means it's up to date on it).
 +                DeletionTime partitionRepairDeletion = 
partitionLevelRepairDeletion(i);
 +                if (markerToRepair[i] == null && 
currentDeletion.supersedes(partitionRepairDeletion))
 +                {
-                     // Since there is an ongoing merged deletion, the only 
way we don't have an open repair for
-                     // this source is that it had a range open with the same 
deletion as current and it's
-                     // closing it.
-                     assert marker.isClose(isReversed) && 
currentDeletion.equals(marker.closeDeletionTime(isReversed))
-                     : String.format("currentDeletion=%s, marker=%s", 
currentDeletion, marker.toString(command.metadata()));
++                    /*
++                     * Since there is an ongoing merged deletion, the only 
two ways we don't have an open repair for
++                     * this source are that:
++                     *
++                     * 1) it had a range open with the same deletion as 
current marker, and the marker is coming from
++                     *    a short read protection response - repeating the 
open RT bound, or
++                     * 2) it had a range open with the same deletion as 
current marker, and the marker is closing it.
++                     */
++                    if (!marker.isBoundary() && marker.isOpen(isReversed)) // 
(1)
++                    {
++                        assert 
currentDeletion.equals(marker.openDeletionTime(isReversed))
++                            : String.format("currentDeletion=%s, marker=%s", 
currentDeletion, marker.toString(command.metadata()));
++                    }
++                    else // (2)
++                    {
++                        assert marker.isClose(isReversed) && 
currentDeletion.equals(marker.closeDeletionTime(isReversed))
++                            : String.format("currentDeletion=%s, marker=%s", 
currentDeletion, marker.toString(command.metadata()));
++                    }
 +
 +                    // and so unless it's a boundary whose opening deletion 
time is still equal to the current
 +                    // deletion (see comment above for why this can actually 
happen), we have to repair the source
 +                    // from that point on.
 +                    if (!(marker.isOpen(isReversed) && 
currentDeletion.equals(marker.openDeletionTime(isReversed))))
 +                        markerToRepair[i] = 
marker.closeBound(isReversed).invert();
 +                }
 +                // In case 2) above, we only have something to do if the 
source is up-to-date after that point
 +                // (which, since the source isn't up-to-date before that 
point, means we're opening a new deletion
 +                // that is equal to the current one).
 +                else if (marker.isOpen(isReversed) && 
currentDeletion.equals(marker.openDeletionTime(isReversed)))
 +                {
 +                    closeOpenMarker(i, marker.openBound(isReversed).invert());
 +                }
 +            }
 +            else
 +            {
 +                // We have a change of current deletion in merged 
(potentially to/from no deletion at all).
 +
 +                if (merged.isClose(isReversed))
 +                {
 +                    // We're closing the merged range. If we're recorded that 
this should be repaird for the
 +                    // source, close and add said range to the repair to send.
 +                    if (markerToRepair[i] != null)
 +                        closeOpenMarker(i, merged.closeBound(isReversed));
 +
 +                }
 +
 +                if (merged.isOpen(isReversed))
 +                {
 +                    // If we're opening a new merged range (or just switching 
deletion), then unless the source
 +                    // is up to date on that deletion (note that we've 
updated what the source deleteion is
 +                    // above), we'll have to sent the range to the source.
 +                    DeletionTime newDeletion = 
merged.openDeletionTime(isReversed);
 +                    DeletionTime sourceDeletion = sourceDeletionTime[i];
 +                    if (!newDeletion.equals(sourceDeletion))
 +                        markerToRepair[i] = merged.openBound(isReversed);
 +                }
 +            }
 +        }
 +
 +        if (merged != null)
 +            mergedDeletionTime = merged.isOpen(isReversed) ? 
merged.openDeletionTime(isReversed) : null;
 +    }
 +
 +    private void closeOpenMarker(int i, ClusteringBound close)
 +    {
 +        ClusteringBound open = markerToRepair[i];
 +        update(i).add(new RangeTombstone(Slice.make(isReversed ? close : 
open, isReversed ? open : close), currentDeletion()));
 +        markerToRepair[i] = null;
 +    }
 +
 +    public void close()
 +    {
 +        RepairListener.PartitionRepair repair = null;
 +        for (int i = 0; i < repairs.length; i++)
 +        {
 +            if (repairs[i] == null)
 +                continue;
 +
 +            if (repair == null)
 +            {
 +                repair = repairListener.startPartitionRepair();
 +            }
 +            repair.reportMutation(sources[i], new 
Mutation(repairs[i].build()));
 +        }
 +
 +        if (repair != null)
 +        {
 +            repair.finish();
 +        }
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to