Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/489be961 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/489be961 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/489be961 Branch: refs/heads/trunk Commit: 489be961c945e4330a9426d21b2bb903cc1d3a54 Parents: 73547a3 48abc03 Author: Paulo Motta <[email protected]> Authored: Thu Dec 15 16:46:32 2016 -0200 Committer: Paulo Motta <[email protected]> Committed: Thu Dec 15 16:49:10 2016 -0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/Keyspace.java | 89 +++++++++++--------- src/java/org/apache/cassandra/db/Mutation.java | 17 ++-- .../db/commitlog/CommitLogReplayer.java | 10 +-- src/java/org/apache/cassandra/hints/Hint.java | 40 ++++++--- .../apache/cassandra/hints/HintVerbHandler.java | 4 +- .../cassandra/service/paxos/PaxosState.java | 9 +- .../cassandra/streaming/StreamReceiveTask.java | 6 +- 8 files changed, 90 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/489be961/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 3db0179,63e095d..fa0c94a --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,114 -1,5 +1,115 @@@ -3.0.11 +3.10 + * Remove outboundBindAny configuration property (CASSANDRA-12673) + * Use correct bounds for all-data range when filtering (CASSANDRA-12666) + * Remove timing window in test case (CASSANDRA-12875) + * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945) + * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919) + * Fix validation of non-frozen UDT cells (CASSANDRA-12916) + * Don't shut down socket input/output on StreamSession (CASSANDRA-12903) + * Fix Murmur3PartitionerTest (CASSANDRA-12858) + * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897) + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283) + * Fix cassandra-stress truncate option (CASSANDRA-12695) + * Fix crossNode value when receiving messages (CASSANDRA-12791) + * Don't load MX4J beans twice (CASSANDRA-12869) + * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838) + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836) + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845) + * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454) + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777) + * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419) + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803) + * Use different build directories for Eclipse and Ant (CASSANDRA-12466) + * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815) + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812) + * Upgrade commons-codec to 1.9 (CASSANDRA-12790) + * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550) + * Add duration data type (CASSANDRA-11873) + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784) + * Improve sum aggregate functions (CASSANDRA-12417) + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761) + * cqlsh fails to format collections when using aliases (CASSANDRA-11534) + * Check for hash conflicts in prepared statements (CASSANDRA-12733) + * Exit query parsing upon first error (CASSANDRA-12598) + * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729) + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450) + * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199) + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461) + * Add hint delivery metrics (CASSANDRA-12693) + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731) + * ColumnIndex does not reuse buffer (CASSANDRA-12502) + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697) + * Upgrade metrics-reporter dependencies (CASSANDRA-12089) + * Tune compaction thread count via nodetool (CASSANDRA-12248) + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232) + * Include repair session IDs in repair start message (CASSANDRA-12532) + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039) + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667) + * Support optional backpressure strategies at the coordinator (CASSANDRA-9318) + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647) + * Fix cassandra-stress graphing (CASSANDRA-12237) + * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031) + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585) + * Add JMH benchmarks.jar (CASSANDRA-12586) + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567) + * Add keep-alive to streaming (CASSANDRA-11841) + * Tracing payload is passed through newSession(..) (CASSANDRA-11706) + * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261) + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486) + * Retry all internode messages once after a connection is + closed and reopened (CASSANDRA-12192) + * Add support to rebuild from targeted replica (CASSANDRA-9875) + * Add sequence distribution type to cassandra stress (CASSANDRA-12490) + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154) + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474) + * Extend read/write failure messages with a map of replica addresses + to error codes in the v5 native protocol (CASSANDRA-12311) + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374) + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550) + * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378) + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223) + * Added slow query log (CASSANDRA-12403) + * Count full coordinated request against timeout (CASSANDRA-12256) + * Allow TTL with null value on insert and update (CASSANDRA-12216) + * Make decommission operation resumable (CASSANDRA-12008) + * Add support to one-way targeted repair (CASSANDRA-9876) + * Remove clientutil jar (CASSANDRA-11635) + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717) + * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358) + * Cassandra stress should dump all setting on startup (CASSANDRA-11914) + * Make it possible to compact a given token range (CASSANDRA-10643) + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179) + * Collect metrics on queries by consistency level (CASSANDRA-7384) + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707) + * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228) + * Upgrade to OHC 0.4.4 (CASSANDRA-12133) + * Add version command to cassandra-stress (CASSANDRA-12258) + * Create compaction-stress tool (CASSANDRA-11844) + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019) + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142) + * Support filtering on non-PRIMARY KEY columns in the CREATE + MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368) + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004) + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174) + * Faster write path (CASSANDRA-12269) + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424) + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035) + * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635) + * Prepend snapshot name with "truncated" or "dropped" when a snapshot + is taken before truncating or dropping a table (CASSANDRA-12178) + * Optimize RestrictionSet (CASSANDRA-12153) + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150) + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613) + * Create a system table to expose prepared statements (CASSANDRA-8831) + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970) + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580) + * Add supplied username to authentication error messages (CASSANDRA-12076) + * Remove pre-startup check for open JMX port (CASSANDRA-12074) + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738) + * Restore resumable hints delivery (CASSANDRA-11960) + * Properly report LWT contention (CASSANDRA-12626) +Merged from 3.0: + * Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905) * Fix DELETE and UPDATE queries with empty IN restrictions (CASSANDRA-12829) * Mark MVs as built after successful bootstrap (CASSANDRA-12984) * Estimated TS drop-time histogram updated with Cell.NO_DELETION_TIME (CASSANDRA-13040) http://git-wip-us.apache.org/repos/asf/cassandra/blob/489be961/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/Keyspace.java index bd58f75,3715995..d9f8f62 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@@ -478,63 -435,35 +476,67 @@@ public class Keyspac if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS)) throw new RuntimeException("Testing write failures"); + Lock[] locks = null; + boolean requiresViewUpdate = updateIndexes && viewManager.updatesAffectView(Collections.singleton(mutation), false); - final CompletableFuture<?> mark = future == null ? new CompletableFuture<>() : future; + + // If apply is not deferrable, no future is required, returns always null + if (isDeferrable && future == null) { + future = new CompletableFuture<>(); + } - Lock lock = null; if (requiresViewUpdate) { mutation.viewLockAcquireStart.compareAndSet(0L, System.currentTimeMillis()); - while (true) - { - if (TEST_FAIL_MV_LOCKS_COUNT == 0) - lock = ViewManager.acquireLockFor(mutation.key().getKey()); - else - TEST_FAIL_MV_LOCKS_COUNT--; - if (lock == null) + // the order of lock acquisition doesn't matter (from a deadlock perspective) because we only use tryLock() + Collection<UUID> columnFamilyIds = mutation.getColumnFamilyIds(); + Iterator<UUID> idIterator = columnFamilyIds.iterator(); + + locks = new Lock[columnFamilyIds.size()]; + for (int i = 0; i < columnFamilyIds.size(); i++) + { + UUID cfid = idIterator.next(); + int lockKey = Objects.hash(mutation.key().getKey(), cfid); + while (true) { - //throw WTE only if request is droppable - if (isDroppable && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout()) + Lock lock = null; + + if (TEST_FAIL_MV_LOCKS_COUNT == 0) + lock = ViewManager.acquireLockFor(lockKey); + else + TEST_FAIL_MV_LOCKS_COUNT--; + + if (lock == null) { - // avoid throwing a WTE during commitlog replay - if (!isClReplay && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout()) - logger.trace("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey())); - Tracing.trace("Could not acquire MV lock"); - if (future != null) ++ //throw WTE only if request is droppable ++ if (isDroppable && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout()) { - future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1)); + for (int j = 0; j < i; j++) + locks[j].unlock(); + + logger.trace("Could not acquire lock for {} and table {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()), columnFamilyStores.get(cfid).name); + Tracing.trace("Could not acquire MV lock"); + if (future != null) + { + future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1)); - return mark; ++ return future; + } + else + throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1); + } + else if (isDeferrable) + { + for (int j = 0; j < i; j++) + locks[j].unlock(); + + // This view update can't happen right now. so rather than keep this thread busy + // we will re-apply ourself to the queue and try again later ++ final CompletableFuture<?> mark = future; + StageManager.getStage(Stage.MUTATION).execute(() -> - apply(mutation, writeCommitLog, true, isClReplay, mark) ++ apply(mutation, writeCommitLog, true, isDroppable, true, mark) + ); - - return mark; + return future; } else { @@@ -561,13 -512,6 +563,15 @@@ break; } } + + long acquireTime = System.currentTimeMillis() - mutation.viewLockAcquireStart.get(); - if (!isClReplay) ++ // Metrics are only collected for droppable write operations ++ // Bulk non-droppable operations (e.g. commitlog replay, hint delivery) are not measured ++ if (isDroppable) + { + for(UUID cfid : columnFamilyIds) + columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime, TimeUnit.MILLISECONDS); + } } int nowInSec = FBUtilities.nowInSeconds(); try (OpOrder.Group opGroup = writeOrder.start()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/489be961/src/java/org/apache/cassandra/db/Mutation.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/489be961/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index eeb9bc8,d53f0f8..4d2971f --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@@ -175,105 -208,38 +175,97 @@@ public class CommitLogReplayer implemen return replayedCount.get(); } - private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader, boolean tolerateTruncation) throws IOException + /* + * Wrapper around initiating mutations read from the log to make it possible + * to spy on initiated mutations for test + */ + @VisibleForTesting + public static class MutationInitiator { - if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE) - { - // There was no room in the segment to write a final header. No data could be present here. - return -1; - } - reader.seek(offset); - CRC32 crc = new CRC32(); - updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL)); - updateChecksumInt(crc, (int) (descriptor.id >>> 32)); - updateChecksumInt(crc, (int) reader.getPosition()); - int end = reader.readInt(); - long filecrc = reader.readInt() & 0xffffffffL; - if (crc.getValue() != filecrc) + protected Future<Integer> initiateMutation(final Mutation mutation, + final long segmentId, + final int serializedSize, + final int entryLocation, + final CommitLogReplayer commitLogReplayer) { - if (end != 0 || filecrc != 0) + Runnable runnable = new WrappedRunnable() { - handleReplayError(false, - "Encountered bad header at position %d of commit log %s, with invalid CRC. " + - "The end of segment marker should be zero.", - offset, reader.getPath()); - } - return -1; - } - else if (end < offset || end > reader.length()) - { - handleReplayError(tolerateTruncation, "Encountered bad header at position %d of commit log %s, with bad position but valid CRC", - offset, reader.getPath()); - return -1; + public void runMayThrow() + { + if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null) + return; + if (commitLogReplayer.pointInTimeExceeded(mutation)) + return; + + final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); + + // Rebuild the mutation, omitting column families that + // a) the user has requested that we ignore, + // b) have already been flushed, + // or c) are part of a cf that was dropped. + // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead. + Mutation newMutation = null; + for (PartitionUpdate update : commitLogReplayer.replayFilter.filter(mutation)) + { + if (Schema.instance.getCF(update.metadata().cfId) == null) + continue; // dropped + + // replay if current segment is newer than last flushed one or, + // if it is the last known segment, if we are after the commit log segment position + if (commitLogReplayer.shouldReplay(update.metadata().cfId, new CommitLogPosition(segmentId, entryLocation))) + { + if (newMutation == null) + newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key()); + newMutation.add(update); + commitLogReplayer.replayedCount.incrementAndGet(); + } + } + if (newMutation != null) + { + assert !newMutation.isEmpty(); + - try - { - Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation)); - } - catch (ExecutionException e) - { - throw Throwables.propagate(e.getCause()); - } - ++ Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false, true, false); + commitLogReplayer.keyspacesReplayed.add(keyspace); + } + } + }; + return StageManager.getStage(Stage.MUTATION).submit(runnable, serializedSize); } - return end; + } + + /** + * A set of known safe-to-discard commit log replay positions, based on + * the range covered by on disk sstables and those prior to the most recent truncation record + */ + public static IntervalSet<CommitLogPosition> persistedIntervals(Iterable<SSTableReader> onDisk, CommitLogPosition truncatedAt) + { + IntervalSet.Builder<CommitLogPosition> builder = new IntervalSet.Builder<>(); + for (SSTableReader reader : onDisk) + builder.addAll(reader.getSSTableMetadata().commitLogIntervals); + + if (truncatedAt != null) + builder.add(CommitLogPosition.NONE, truncatedAt); + return builder.build(); + } + + /** + * Find the earliest commit log position that is not covered by the known flushed ranges for some table. + * + * For efficiency this assumes that the first contiguously flushed interval we know of contains the moment that the + * given table was constructed* and hence we can start replay from the end of that interval. + * + * If such an interval is not known, we must replay from the beginning. + * + * * This is not true only until if the very first flush of a table stalled or failed, while the second or latter + * succeeded. The chances of this happening are at most very low, and if the assumption does prove to be + * incorrect during replay there is little chance that the affected deployment is in production. + */ + public static CommitLogPosition firstNotCovered(Collection<IntervalSet<CommitLogPosition>> ranges) + { + return ranges.stream() + .map(intervals -> Iterables.getFirst(intervals.ends(), CommitLogPosition.NONE)) + .min(Ordering.natural()) + .get(); // iteration is per known-CF, there must be at least one. } abstract static class ReplayFilter http://git-wip-us.apache.org/repos/asf/cassandra/blob/489be961/src/java/org/apache/cassandra/hints/HintVerbHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/489be961/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 8fe5a49,b6b8387..6c60b74 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@@ -192,17 -185,8 +193,14 @@@ public class StreamReceiveTask extends { try (UnfilteredRowIterator rowIterator = scanner.next()) { - // MV *can* be applied unsafe as we flush below before transaction is done. - ks.apply(new Mutation(PartitionUpdate.fromIterator(rowIterator)), false, true, false); + Mutation m = new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata))); + + // MV *can* be applied unsafe if there's no CDC on the CFS as we flush below + // before transaction is done. + // + // If the CFS has CDC, however, these updates need to be written to the CommitLog + // so they get archived into the cdc_raw folder - if (hasCDC) - m.apply(); - else - m.applyUnsafe(); ++ ks.apply(m, hasCDC, true, false); } } }
