Merge branch 'cassandra-3.0' into cassandra-3.X

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0a1f1c81
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0a1f1c81
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0a1f1c81

Branch: refs/heads/cassandra-3.X
Commit: 0a1f1c81e641039ca9fd573d5217b6b6f2ad8fb8
Parents: 9be467a d38a732
Author: Tyler Hobbs <[email protected]>
Authored: Fri Oct 28 15:41:02 2016 -0500
Committer: Tyler Hobbs <[email protected]>
Committed: Fri Oct 28 15:41:02 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |   2 +-
 src/java/org/apache/cassandra/db/Keyspace.java  | 110 ++++++++++++++-----
 src/java/org/apache/cassandra/db/Mutation.java  |  12 +-
 .../cassandra/service/paxos/PaxosState.java     |   2 +-
 5 files changed, 92 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a1f1c81/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index bbd6f00,c80e045..82d3d9c
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,101 -1,5 +1,102 @@@
 -3.0.10
 - * Avoid deadlock due to materialized view lock contention (CASSANDRA-12689)
 +3.10
 + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
 + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
 + * Properly format IPv6 addresses when logging JMX service URL 
(CASSANDRA-12454)
 + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
 + * Use non-token restrictions for bounds when token restrictions are 
overridden (CASSANDRA-12419)
 + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
 + * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
 + * Avoid potential AttributeError in cqlsh due to no table metadata 
(CASSANDRA-12815)
 + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster 
(CASSANDRA-12812)
 + * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
 + * Make the fanout size for LeveledCompactionStrategy to be configurable 
(CASSANDRA-11550)
 + * Add duration data type (CASSANDRA-11873)
 + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
 + * Improve sum aggregate functions (CASSANDRA-12417)
 + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes 
in CASSANDRA-10876 (CASSANDRA-12761)
 + * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
 + * Check for hash conflicts in prepared statements (CASSANDRA-12733)
 + * Exit query parsing upon first error (CASSANDRA-12598)
 + * Fix cassandra-stress to use single seed in UUID generation 
(CASSANDRA-12729)
 + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
 + * Config class uses boxed types but DD exposes primitive types 
(CASSANDRA-12199)
 + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
 + * Add hint delivery metrics (CASSANDRA-12693)
 + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
 + * ColumnIndex does not reuse buffer (CASSANDRA-12502)
 + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
 + * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
 + * Tune compaction thread count via nodetool (CASSANDRA-12248)
 + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
 + * Include repair session IDs in repair start message (CASSANDRA-12532)
 + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
 + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
 + * Support optional backpressure strategies at the coordinator 
(CASSANDRA-9318)
 + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
 + * Fix cassandra-stress graphing (CASSANDRA-12237)
 + * Allow filtering on partition key columns for queries without secondary 
indexes (CASSANDRA-11031)
 + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
 + * Add JMH benchmarks.jar (CASSANDRA-12586)
 + * Add row offset support to SASI (CASSANDRA-11990)
 + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
 + * Add keep-alive to streaming (CASSANDRA-11841)
 + * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
 + * avoid deleting non existing sstable files and improve related log messages 
(CASSANDRA-12261)
 + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
 + * Retry all internode messages once after a connection is
 +   closed and reopened (CASSANDRA-12192)
 + * Add support to rebuild from targeted replica (CASSANDRA-9875)
 + * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
 + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
 + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
 + * Extend read/write failure messages with a map of replica addresses
 +   to error codes in the v5 native protocol (CASSANDRA-12311)
 + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
 + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 
12550)
 + * Fix clustering indexes in presence of static columns in SASI 
(CASSANDRA-12378)
 + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
 + * Added slow query log (CASSANDRA-12403)
 + * Count full coordinated request against timeout (CASSANDRA-12256)
 + * Allow TTL with null value on insert and update (CASSANDRA-12216)
 + * Make decommission operation resumable (CASSANDRA-12008)
 + * Add support to one-way targeted repair (CASSANDRA-9876)
 + * Remove clientutil jar (CASSANDRA-11635)
 + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
 + * Delay releasing Memtable memory on flush until PostFlush has finished 
running (CASSANDRA-12358)
 + * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
 + * Make it possible to compact a given token range (CASSANDRA-10643)
 + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
 + * Collect metrics on queries by consistency level (CASSANDRA-7384)
 + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
 + * Deprecate memtable_cleanup_threshold and update default for 
memtable_flush_writers (CASSANDRA-12228)
 + * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
 + * Add version command to cassandra-stress (CASSANDRA-12258)
 + * Create compaction-stress tool (CASSANDRA-11844)
 + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
 + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
 + * Support filtering on non-PRIMARY KEY columns in the CREATE
 +   MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
 + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
 + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
 + * Faster write path (CASSANDRA-12269)
 + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
 + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
 + * Expose metrics for successful/failed authentication attempts 
(CASSANDRA-10635)
 + * Prepend snapshot name with "truncated" or "dropped" when a snapshot
 +   is taken before truncating or dropping a table (CASSANDRA-12178)
 + * Optimize RestrictionSet (CASSANDRA-12153)
 + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
 + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
 + * Create a system table to expose prepared statements (CASSANDRA-8831)
 + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
 + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
 + * Add supplied username to authentication error messages (CASSANDRA-12076)
 + * Remove pre-startup check for open JMX port (CASSANDRA-12074)
 + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
 + * Restore resumable hints delivery (CASSANDRA-11960)
 + * Properly report LWT contention (CASSANDRA-12626)
 +Merged from 3.0:
++ * Avoid deadlock due to MV lock contention (CASSANDRA-12689)
   * Fix for KeyCacheCqlTest flakiness (CASSANDRA-12801)
   * Include SSTable filename in compacting large row message (CASSANDRA-12384)
   * Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a1f1c81/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 5041186,7b32a34..4261674
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -394,7 -392,7 +394,7 @@@ public class DatabaseDescripto
              throw new ConfigurationException("concurrent_reads must be at 
least 2, but was " + conf.concurrent_reads, false);
          }
  
-         if (conf.concurrent_writes < 2)
 -        if (conf.concurrent_writes != null && conf.concurrent_writes < 2 && 
System.getProperty("cassandra.test.fail_mv_locks_count", "").isEmpty())
++        if (conf.concurrent_writes < 2 && 
System.getProperty("cassandra.test.fail_mv_locks_count", "").isEmpty())
          {
              throw new ConfigurationException("concurrent_writes must be at 
least 2, but was " + conf.concurrent_writes, false);
          }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a1f1c81/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Keyspace.java
index d3f5798,75aab8f..cd24d0d
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@@ -453,63 -438,76 +479,96 @@@ public class Keyspac
          if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS))
              throw new RuntimeException("Testing write failures");
  
 +        Lock[] locks = null;
++
          boolean requiresViewUpdate = updateIndexes && 
viewManager.updatesAffectView(Collections.singleton(mutation), false);
          final CompletableFuture<?> mark = future == null ? new 
CompletableFuture<>() : future;
  
          if (requiresViewUpdate)
          {
              mutation.viewLockAcquireStart.compareAndSet(0L, 
System.currentTimeMillis());
 -            while (true)
 -            {
 -                if (TEST_FAIL_MV_LOCKS_COUNT == 0)
 -                    lock = 
ViewManager.acquireLockFor(mutation.key().getKey());
 -                else
 -                    TEST_FAIL_MV_LOCKS_COUNT--;
  
 -                if (lock == null)
 +            // the order of lock acquisition doesn't matter (from a deadlock 
perspective) because we only use tryLock()
 +            Collection<UUID> columnFamilyIds = mutation.getColumnFamilyIds();
 +            Iterator<UUID> idIterator = columnFamilyIds.iterator();
-             locks = new Lock[columnFamilyIds.size()];
 +
++            locks = new Lock[columnFamilyIds.size()];
 +            for (int i = 0; i < columnFamilyIds.size(); i++)
 +            {
 +                UUID cfid = idIterator.next();
 +                int lockKey = Objects.hash(mutation.key().getKey(), cfid);
-                 Lock lock = ViewManager.acquireLockFor(lockKey);
-                 if (lock == null)
++                while (true)
                  {
-                     // we will either time out or retry, so release all 
acquired locks
-                     for (int j = 0; j < i; j++)
-                         locks[j].unlock();
 -                    // avoid throwing a WTE during commitlog replay
 -                    if (!isClReplay && (System.currentTimeMillis() - 
mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
++                    Lock lock = null;
 +
-                     // avoid throwing a WTE during commitlog replay
-                     if (!isClReplay && (System.currentTimeMillis() - 
mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
++                    if (TEST_FAIL_MV_LOCKS_COUNT == 0)
++                        lock = ViewManager.acquireLockFor(lockKey);
++                    else
++                        TEST_FAIL_MV_LOCKS_COUNT--;
++
++                    if (lock == null)
                      {
-                         logger.trace("Could not acquire lock for {} and table 
{}", ByteBufferUtil.bytesToHex(mutation.key().getKey()), 
columnFamilyStores.get(cfid).name);
 -                        logger.trace("Could not acquire lock for {}", 
ByteBufferUtil.bytesToHex(mutation.key().getKey()));
--                        Tracing.trace("Could not acquire MV lock");
--                        if (future != null)
-                             future.completeExceptionally(new 
WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1));
++                        // avoid throwing a WTE during commitlog replay
++                        if (!isClReplay && (System.currentTimeMillis() - 
mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
+                         {
 -                            future.completeExceptionally(new 
WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1));
 -                            return mark;
++                            for (int j = 0; j < i; j++)
++                                locks[j].unlock();
++
++                            logger.trace("Could not acquire lock for {} and 
table {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()), 
columnFamilyStores.get(cfid).name);
++                            Tracing.trace("Could not acquire MV lock");
++                            if (future != null)
++                            {
++                                future.completeExceptionally(new 
WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1));
++                                return mark;
++                            }
++                            else
++                                throw new 
WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1);
+                         }
 -                        else
++                        else if (isDeferrable)
+                         {
 -                            throw new WriteTimeoutException(WriteType.VIEW, 
ConsistencyLevel.LOCAL_ONE, 0, 1);
 -                        }
 -                    }
 -                    else if (isDeferrable)
 -                    {
 -                        //This view update can't happen right now. so rather 
than keep this thread busy
 -                        // we will re-apply ourself to the queue and try 
again later
 -                        StageManager.getStage(Stage.MUTATION).execute(() ->
 -                                apply(mutation, writeCommitLog, true, 
isClReplay, mark)
 -                        );
++                            for (int j = 0; j < i; j++)
++                                locks[j].unlock();
+ 
 -                        return mark;
 -                    }
 -                    else
 -                    {
 -                        // Retry lock on same thread, if mutation is not 
deferrable.
 -                        // Mutation is not deferrable, if applied from 
MutationStage and caller is waiting for future to finish
 -                        // If blocking caller defers future, this may lead to 
deadlock situation with all MutationStage workers
 -                        // being blocked by waiting for futures which will 
never be processed as all workers are blocked
 -                        try
 -                        {
 -                            // Wait a little bit before retrying to lock
 -                            Thread.sleep(10);
++                            // This view update can't happen right now. so 
rather than keep this thread busy
++                            // we will re-apply ourself to the queue and try 
again later
++                            StageManager.getStage(Stage.MUTATION).execute(() 
->
++                                                                          
apply(mutation, writeCommitLog, true, isClReplay, mark)
++                            );
++
++                            return mark;
+                         }
 -                        catch (InterruptedException e)
 +                        else
-                             throw new WriteTimeoutException(WriteType.VIEW, 
ConsistencyLevel.LOCAL_ONE, 0, 1);
+                         {
 -                            // Just continue
++                            // Retry lock on same thread, if mutation is not 
deferrable.
++                            // Mutation is not deferrable, if applied from 
MutationStage and caller is waiting for future to finish
++                            // If blocking caller defers future, this may 
lead to deadlock situation with all MutationStage workers
++                            // being blocked by waiting for futures which 
will never be processed as all workers are blocked
++                            try
++                            {
++                                // Wait a little bit before retrying to lock
++                                Thread.sleep(10);
++                            }
++                            catch (InterruptedException e)
++                            {
++                                // Just continue
++                            }
++                            continue;
+                         }
 -                        // continue in while loop
                      }
 -                }
 -                else
 -                {
 -                    long acquireTime = System.currentTimeMillis() - 
mutation.viewLockAcquireStart.get();
 -                    if (!isClReplay)
 +                    else
                      {
-                         // This view update can't happen right now. so rather 
than keep this thread busy
-                         // we will re-apply ourself to the queue and try 
again later
-                         StageManager.getStage(Stage.MUTATION).execute(() ->
-                             apply(mutation, writeCommitLog, true, isClReplay, 
mark)
-                         );
- 
-                         return mark;
 -                        for (UUID cfid : mutation.getColumnFamilyIds())
 -                            
columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime, 
TimeUnit.MILLISECONDS);
++                        locks[i] = lock;
                      }
-                 }
-                 else
-                 {
-                     locks[i] = lock;
+                     break;
                  }
              }
 +
 +            long acquireTime = System.currentTimeMillis() - 
mutation.viewLockAcquireStart.get();
 +            if (!isClReplay)
 +            {
 +                for(UUID cfid : columnFamilyIds)
 +                    
columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime, 
TimeUnit.MILLISECONDS);
 +            }
          }
          int nowInSec = FBUtilities.nowInSeconds();
          try (OpOrder.Group opGroup = writeOrder.start())
@@@ -561,11 -559,8 +620,12 @@@
          }
          finally
          {
 -            if (lock != null)
 -                lock.unlock();
 +            if (locks != null)
 +            {
 +                for (Lock lock : locks)
-                     lock.unlock();
++                    if (lock != null)
++                        lock.unlock();
 +            }
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a1f1c81/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------

Reply via email to