Merge branch 'cassandra-3.0' into cassandra-3.X
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0a1f1c81 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0a1f1c81 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0a1f1c81 Branch: refs/heads/cassandra-3.X Commit: 0a1f1c81e641039ca9fd573d5217b6b6f2ad8fb8 Parents: 9be467a d38a732 Author: Tyler Hobbs <[email protected]> Authored: Fri Oct 28 15:41:02 2016 -0500 Committer: Tyler Hobbs <[email protected]> Committed: Fri Oct 28 15:41:02 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/config/DatabaseDescriptor.java | 2 +- src/java/org/apache/cassandra/db/Keyspace.java | 110 ++++++++++++++----- src/java/org/apache/cassandra/db/Mutation.java | 12 +- .../cassandra/service/paxos/PaxosState.java | 2 +- 5 files changed, 92 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a1f1c81/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index bbd6f00,c80e045..82d3d9c --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,101 -1,5 +1,102 @@@ -3.0.10 - * Avoid deadlock due to materialized view lock contention (CASSANDRA-12689) +3.10 + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836) + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845) + * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454) + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777) + * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419) + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803) + * Use different build directories for Eclipse and Ant (CASSANDRA-12466) + * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815) + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812) + * Upgrade commons-codec to 1.9 (CASSANDRA-12790) + * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550) + * Add duration data type (CASSANDRA-11873) + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784) + * Improve sum aggregate functions (CASSANDRA-12417) + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761) + * cqlsh fails to format collections when using aliases (CASSANDRA-11534) + * Check for hash conflicts in prepared statements (CASSANDRA-12733) + * Exit query parsing upon first error (CASSANDRA-12598) + * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729) + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450) + * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199) + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461) + * Add hint delivery metrics (CASSANDRA-12693) + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731) + * ColumnIndex does not reuse buffer (CASSANDRA-12502) + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697) + * Upgrade metrics-reporter dependencies (CASSANDRA-12089) + * Tune compaction thread count via nodetool (CASSANDRA-12248) + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232) + * Include repair session IDs in repair start message (CASSANDRA-12532) + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039) + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667) + * Support optional backpressure strategies at the coordinator (CASSANDRA-9318) + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647) + * Fix cassandra-stress graphing (CASSANDRA-12237) + * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031) + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585) + * Add JMH benchmarks.jar (CASSANDRA-12586) + * Add row offset support to SASI (CASSANDRA-11990) + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567) + * Add keep-alive to streaming (CASSANDRA-11841) + * Tracing payload is passed through newSession(..) (CASSANDRA-11706) + * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261) + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486) + * Retry all internode messages once after a connection is + closed and reopened (CASSANDRA-12192) + * Add support to rebuild from targeted replica (CASSANDRA-9875) + * Add sequence distribution type to cassandra stress (CASSANDRA-12490) + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154) + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474) + * Extend read/write failure messages with a map of replica addresses + to error codes in the v5 native protocol (CASSANDRA-12311) + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374) + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550) + * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378) + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223) + * Added slow query log (CASSANDRA-12403) + * Count full coordinated request against timeout (CASSANDRA-12256) + * Allow TTL with null value on insert and update (CASSANDRA-12216) + * Make decommission operation resumable (CASSANDRA-12008) + * Add support to one-way targeted repair (CASSANDRA-9876) + * Remove clientutil jar (CASSANDRA-11635) + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717) + * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358) + * Cassandra stress should dump all setting on startup (CASSANDRA-11914) + * Make it possible to compact a given token range (CASSANDRA-10643) + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179) + * Collect metrics on queries by consistency level (CASSANDRA-7384) + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707) + * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228) + * Upgrade to OHC 0.4.4 (CASSANDRA-12133) + * Add version command to cassandra-stress (CASSANDRA-12258) + * Create compaction-stress tool (CASSANDRA-11844) + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019) + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142) + * Support filtering on non-PRIMARY KEY columns in the CREATE + MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368) + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004) + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174) + * Faster write path (CASSANDRA-12269) + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424) + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035) + * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635) + * Prepend snapshot name with "truncated" or "dropped" when a snapshot + is taken before truncating or dropping a table (CASSANDRA-12178) + * Optimize RestrictionSet (CASSANDRA-12153) + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150) + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613) + * Create a system table to expose prepared statements (CASSANDRA-8831) + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970) + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580) + * Add supplied username to authentication error messages (CASSANDRA-12076) + * Remove pre-startup check for open JMX port (CASSANDRA-12074) + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738) + * Restore resumable hints delivery (CASSANDRA-11960) + * Properly report LWT contention (CASSANDRA-12626) +Merged from 3.0: ++ * Avoid deadlock due to MV lock contention (CASSANDRA-12689) * Fix for KeyCacheCqlTest flakiness (CASSANDRA-12801) * Include SSTable filename in compacting large row message (CASSANDRA-12384) * Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a1f1c81/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 5041186,7b32a34..4261674 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@@ -394,7 -392,7 +394,7 @@@ public class DatabaseDescripto throw new ConfigurationException("concurrent_reads must be at least 2, but was " + conf.concurrent_reads, false); } - if (conf.concurrent_writes < 2) - if (conf.concurrent_writes != null && conf.concurrent_writes < 2 && System.getProperty("cassandra.test.fail_mv_locks_count", "").isEmpty()) ++ if (conf.concurrent_writes < 2 && System.getProperty("cassandra.test.fail_mv_locks_count", "").isEmpty()) { throw new ConfigurationException("concurrent_writes must be at least 2, but was " + conf.concurrent_writes, false); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a1f1c81/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/Keyspace.java index d3f5798,75aab8f..cd24d0d --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@@ -453,63 -438,76 +479,96 @@@ public class Keyspac if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS)) throw new RuntimeException("Testing write failures"); + Lock[] locks = null; ++ boolean requiresViewUpdate = updateIndexes && viewManager.updatesAffectView(Collections.singleton(mutation), false); final CompletableFuture<?> mark = future == null ? new CompletableFuture<>() : future; if (requiresViewUpdate) { mutation.viewLockAcquireStart.compareAndSet(0L, System.currentTimeMillis()); - while (true) - { - if (TEST_FAIL_MV_LOCKS_COUNT == 0) - lock = ViewManager.acquireLockFor(mutation.key().getKey()); - else - TEST_FAIL_MV_LOCKS_COUNT--; - if (lock == null) + // the order of lock acquisition doesn't matter (from a deadlock perspective) because we only use tryLock() + Collection<UUID> columnFamilyIds = mutation.getColumnFamilyIds(); + Iterator<UUID> idIterator = columnFamilyIds.iterator(); - locks = new Lock[columnFamilyIds.size()]; + ++ locks = new Lock[columnFamilyIds.size()]; + for (int i = 0; i < columnFamilyIds.size(); i++) + { + UUID cfid = idIterator.next(); + int lockKey = Objects.hash(mutation.key().getKey(), cfid); - Lock lock = ViewManager.acquireLockFor(lockKey); - if (lock == null) ++ while (true) { - // we will either time out or retry, so release all acquired locks - for (int j = 0; j < i; j++) - locks[j].unlock(); - // avoid throwing a WTE during commitlog replay - if (!isClReplay && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout()) ++ Lock lock = null; + - // avoid throwing a WTE during commitlog replay - if (!isClReplay && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout()) ++ if (TEST_FAIL_MV_LOCKS_COUNT == 0) ++ lock = ViewManager.acquireLockFor(lockKey); ++ else ++ TEST_FAIL_MV_LOCKS_COUNT--; ++ ++ if (lock == null) { - logger.trace("Could not acquire lock for {} and table {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()), columnFamilyStores.get(cfid).name); - logger.trace("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey())); -- Tracing.trace("Could not acquire MV lock"); -- if (future != null) - future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1)); ++ // avoid throwing a WTE during commitlog replay ++ if (!isClReplay && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout()) + { - future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1)); - return mark; ++ for (int j = 0; j < i; j++) ++ locks[j].unlock(); ++ ++ logger.trace("Could not acquire lock for {} and table {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()), columnFamilyStores.get(cfid).name); ++ Tracing.trace("Could not acquire MV lock"); ++ if (future != null) ++ { ++ future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1)); ++ return mark; ++ } ++ else ++ throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1); + } - else ++ else if (isDeferrable) + { - throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1); - } - } - else if (isDeferrable) - { - //This view update can't happen right now. so rather than keep this thread busy - // we will re-apply ourself to the queue and try again later - StageManager.getStage(Stage.MUTATION).execute(() -> - apply(mutation, writeCommitLog, true, isClReplay, mark) - ); ++ for (int j = 0; j < i; j++) ++ locks[j].unlock(); + - return mark; - } - else - { - // Retry lock on same thread, if mutation is not deferrable. - // Mutation is not deferrable, if applied from MutationStage and caller is waiting for future to finish - // If blocking caller defers future, this may lead to deadlock situation with all MutationStage workers - // being blocked by waiting for futures which will never be processed as all workers are blocked - try - { - // Wait a little bit before retrying to lock - Thread.sleep(10); ++ // This view update can't happen right now. so rather than keep this thread busy ++ // we will re-apply ourself to the queue and try again later ++ StageManager.getStage(Stage.MUTATION).execute(() -> ++ apply(mutation, writeCommitLog, true, isClReplay, mark) ++ ); ++ ++ return mark; + } - catch (InterruptedException e) + else - throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1); + { - // Just continue ++ // Retry lock on same thread, if mutation is not deferrable. ++ // Mutation is not deferrable, if applied from MutationStage and caller is waiting for future to finish ++ // If blocking caller defers future, this may lead to deadlock situation with all MutationStage workers ++ // being blocked by waiting for futures which will never be processed as all workers are blocked ++ try ++ { ++ // Wait a little bit before retrying to lock ++ Thread.sleep(10); ++ } ++ catch (InterruptedException e) ++ { ++ // Just continue ++ } ++ continue; + } - // continue in while loop } - } - else - { - long acquireTime = System.currentTimeMillis() - mutation.viewLockAcquireStart.get(); - if (!isClReplay) + else { - // This view update can't happen right now. so rather than keep this thread busy - // we will re-apply ourself to the queue and try again later - StageManager.getStage(Stage.MUTATION).execute(() -> - apply(mutation, writeCommitLog, true, isClReplay, mark) - ); - - return mark; - for (UUID cfid : mutation.getColumnFamilyIds()) - columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime, TimeUnit.MILLISECONDS); ++ locks[i] = lock; } - } - else - { - locks[i] = lock; + break; } } + + long acquireTime = System.currentTimeMillis() - mutation.viewLockAcquireStart.get(); + if (!isClReplay) + { + for(UUID cfid : columnFamilyIds) + columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime, TimeUnit.MILLISECONDS); + } } int nowInSec = FBUtilities.nowInSeconds(); try (OpOrder.Group opGroup = writeOrder.start()) @@@ -561,11 -559,8 +620,12 @@@ } finally { - if (lock != null) - lock.unlock(); + if (locks != null) + { + for (Lock lock : locks) - lock.unlock(); ++ if (lock != null) ++ lock.unlock(); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a1f1c81/src/java/org/apache/cassandra/db/Mutation.java ----------------------------------------------------------------------
