This is an automated email from the ASF dual-hosted git repository. aweisberg pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 028b6772231e18e2357dfff1fa04477efa377730 Merge: 00fff3e 62f0280 Author: Ariel Weisberg <[email protected]> AuthorDate: Tue Jan 15 11:03:53 2019 -0500 Merge branch 'cassandra-3.11' into trunk CHANGES.txt | 1 + .../org/apache/cassandra/db/SystemKeyspace.java | 26 +++++++++++++++++++++- .../apache/cassandra/service/CassandraDaemon.java | 16 ++++++++----- .../apache/cassandra/service/StorageService.java | 23 +++++++++++++++++++ .../cassandra/service/StorageServiceMBean.java | 5 +++++ 5 files changed, 64 insertions(+), 7 deletions(-) diff --cc CHANGES.txt index ba44dcf,477afd8..5af5b64 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -343,8 -2,11 +343,9 @@@ * Make stop-server.bat wait for Cassandra to terminate (CASSANDRA-14829) * Correct sstable sorting for garbagecollect and levelled compaction (CASSANDRA-14870) Merged from 3.0: + * If SizeEstimatesRecorder misses a 'onDropTable' notification, the size_estimates table will never be cleared for that table. (CASSANDRA-14905) - * Counters fail to increment in 2.1/2.2 to 3.X mixed version clusters (CASSANDRA-14958) * Streaming needs to synchronise access to LifecycleTransaction (CASSANDRA-14554) * Fix cassandra-stress write hang with default options (CASSANDRA-14616) - * Differentiate between slices and RTs when decoding legacy bounds (CASSANDRA-14919) * Netty epoll IOExceptions caused by unclean client disconnects being logged at INFO (CASSANDRA-14909) * Unfiltered.isEmpty conflicts with Row extends AbstractCollection.isEmpty (CASSANDRA-14588) * RangeTombstoneList doesn't properly clean up mergeable or superseded rts in some cases (CASSANDRA-14894) diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java index 1b3b2a6,812659c..ddf6475 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@@ -777,7 -802,7 +777,7 @@@ public final class SystemKeyspac /** * This method is used to update the System Keyspace with the new tokens for this node -- */ ++ */ public static synchronized void updateTokens(Collection<Token> tokens) { assert !tokens.isEmpty() : "removeEndpoint should be used instead"; @@@ -1275,49 -1277,57 +1275,73 @@@ executeInternal(cql, keyspace, table); } + /** + * Clears size estimates for a keyspace (used to manually clean when we miss a keyspace drop) + */ + public static void clearSizeEstimates(String keyspace) + { + String cql = String.format("DELETE FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SYSTEM_KEYSPACE_NAME, SIZE_ESTIMATES); + executeInternal(cql, keyspace); + } + + /** + * @return A multimap from keyspace to table for all tables with entries in size estimates + */ - + public static synchronized SetMultimap<String, String> getTablesWithSizeEstimates() + { + SetMultimap<String, String> keyspaceTableMap = HashMultimap.create(); + String cql = String.format("SELECT keyspace_name, table_name FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SIZE_ESTIMATES); + UntypedResultSet rs = executeInternal(cql); + for (UntypedResultSet.Row row : rs) + { + keyspaceTableMap.put(row.getString("keyspace_name"), row.getString("table_name")); + } - + return keyspaceTableMap; + } + - public static synchronized void updateAvailableRanges(String keyspace, Collection<Range<Token>> completedRanges) + public static synchronized void updateAvailableRanges(String keyspace, Collection<Range<Token>> completedFullRanges, Collection<Range<Token>> completedTransientRanges) { - String cql = "UPDATE system.%s SET ranges = ranges + ? WHERE keyspace_name = ?"; - Set<ByteBuffer> rangesToUpdate = new HashSet<>(completedRanges.size()); - for (Range<Token> range : completedRanges) - { - rangesToUpdate.add(rangeToBytes(range)); - } - executeInternal(String.format(cql, AVAILABLE_RANGES), rangesToUpdate, keyspace); + String cql = "UPDATE system.%s SET full_ranges = full_ranges + ?, transient_ranges = transient_ranges + ? WHERE keyspace_name = ?"; + executeInternal(format(cql, AVAILABLE_RANGES_V2), + completedFullRanges.stream().map(SystemKeyspace::rangeToBytes).collect(Collectors.toSet()), + completedTransientRanges.stream().map(SystemKeyspace::rangeToBytes).collect(Collectors.toSet()), + keyspace); } - public static synchronized Set<Range<Token>> getAvailableRanges(String keyspace, IPartitioner partitioner) + /** + * List of the streamed ranges, where transientness is encoded based on the source, where range was streamed from. + */ + public static synchronized AvailableRanges getAvailableRanges(String keyspace, IPartitioner partitioner) { - Set<Range<Token>> result = new HashSet<>(); String query = "SELECT * FROM system.%s WHERE keyspace_name=?"; - UntypedResultSet rs = executeInternal(String.format(query, AVAILABLE_RANGES), keyspace); + UntypedResultSet rs = executeInternal(format(query, AVAILABLE_RANGES_V2), keyspace); + + ImmutableSet.Builder<Range<Token>> full = new ImmutableSet.Builder<>(); + ImmutableSet.Builder<Range<Token>> trans = new ImmutableSet.Builder<>(); for (UntypedResultSet.Row row : rs) { - Set<ByteBuffer> rawRanges = row.getSet("ranges", BytesType.instance); - for (ByteBuffer rawRange : rawRanges) - { - result.add(byteBufferToRange(rawRange, partitioner)); - } + Optional.ofNullable(row.getSet("full_ranges", BytesType.instance)) + .ifPresent(full_ranges -> full_ranges.stream() + .map(buf -> byteBufferToRange(buf, partitioner)) + .forEach(full::add)); + Optional.ofNullable(row.getSet("transient_ranges", BytesType.instance)) + .ifPresent(transient_ranges -> transient_ranges.stream() + .map(buf -> byteBufferToRange(buf, partitioner)) + .forEach(trans::add)); + } + return new AvailableRanges(full.build(), trans.build()); + } + + public static class AvailableRanges + { + public Set<Range<Token>> full; + public Set<Range<Token>> trans; + + private AvailableRanges(Set<Range<Token>> full, Set<Range<Token>> trans) + { + this.full = full; + this.trans = trans; } - return ImmutableSet.copyOf(result); } public static void resetAvailableRanges() diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java index c8fddab,b593190..af781d5 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@@ -326,9 -331,23 +326,19 @@@ public class CassandraDaemo // Re-populate token metadata after commit log recover (new peers might be loaded onto system keyspace #10293) StorageService.instance.populateTokenMetadata(); - // migrate any legacy (pre-3.0) hints from system.hints table into the new store - new LegacyHintsMigrator(DatabaseDescriptor.getHintsDirectory(), DatabaseDescriptor.getMaxHintsFileSize()).migrate(); - - // migrate any legacy (pre-3.0) batch entries from system.batchlog to system.batches (new table format) - LegacyBatchlogMigrator.migrate(); - SystemKeyspace.finishStartup(); + + // Clean up system.size_estimates entries left lying around from missed keyspace drops (CASSANDRA-14905) + StorageService.instance.cleanupSizeEstimates(); + + // schedule periodic dumps of table size estimates into SystemKeyspace.SIZE_ESTIMATES_CF + // set cassandra.size_recorder_interval to 0 to disable + int sizeRecorderInterval = Integer.getInteger("cassandra.size_recorder_interval", 5 * 60); + if (sizeRecorderInterval > 0) + ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(SizeEstimatesRecorder.instance, 30, sizeRecorderInterval, TimeUnit.SECONDS); + + ActiveRepairService.instance.start(); + // Prepared statements QueryProcessor.preloadPreparedStatement(); @@@ -409,20 -428,12 +419,14 @@@ // due to scheduling errors or race conditions ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(ColumnFamilyStore.getBackgroundCompactionTaskSubmitter(), 5, 1, TimeUnit.MINUTES); - // Thrift - InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress(); - int rpcPort = DatabaseDescriptor.getRpcPort(); - int listenBacklog = DatabaseDescriptor.getRpcListenBacklog(); - thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog); + // schedule periodic recomputation of speculative retry thresholds + ScheduledExecutors.optionalTasks.scheduleWithFixedDelay( + () -> Keyspace.all().forEach(k -> k.getColumnFamilyStores().forEach(ColumnFamilyStore::updateSpeculationThreshold)), + DatabaseDescriptor.getReadRpcTimeout(), + DatabaseDescriptor.getReadRpcTimeout(), + TimeUnit.MILLISECONDS + ); - // schedule periodic dumps of table size estimates into SystemKeyspace.SIZE_ESTIMATES_CF - // set cassandra.size_recorder_interval to 0 to disable - int sizeRecorderInterval = Integer.getInteger("cassandra.size_recorder_interval", 5 * 60); - if (sizeRecorderInterval > 0) - ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(SizeEstimatesRecorder.instance, 30, sizeRecorderInterval, TimeUnit.SECONDS); - // Native transport nativeTransportService = new NativeTransportService(); diff --cc src/java/org/apache/cassandra/service/StorageService.java index 090b10b,7adefe2..ca453d0 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -3712,6 -3386,28 +3713,28 @@@ public class StorageService extends Not FBUtilities.waitOnFuture(ScheduledExecutors.optionalTasks.submit(SizeEstimatesRecorder.instance)); } + public void cleanupSizeEstimates() + { + SetMultimap<String, String> sizeEstimates = SystemKeyspace.getTablesWithSizeEstimates(); + + for (Entry<String, Collection<String>> tablesByKeyspace : sizeEstimates.asMap().entrySet()) + { + String keyspace = tablesByKeyspace.getKey(); + if (!Schema.instance.getKeyspaces().contains(keyspace)) + { + SystemKeyspace.clearSizeEstimates(keyspace); + } + else + { + for (String table : tablesByKeyspace.getValue()) + { - if (!Schema.instance.hasCF(Pair.create(keyspace, table))) ++ if (Schema.instance.getTableMetadataRef(keyspace, table) == null) + SystemKeyspace.clearSizeEstimates(keyspace, table); + } + } + } + } + /** * @param allowIndexes Allow index CF names to be passed in * @param autoAddIndexes Automatically add secondary indexes if a CF has them --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
