This is an automated email from the ASF dual-hosted git repository.

aweisberg pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 028b6772231e18e2357dfff1fa04477efa377730
Merge: 00fff3e 62f0280
Author: Ariel Weisberg <[email protected]>
AuthorDate: Tue Jan 15 11:03:53 2019 -0500

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |  1 +
 .../org/apache/cassandra/db/SystemKeyspace.java    | 26 +++++++++++++++++++++-
 .../apache/cassandra/service/CassandraDaemon.java  | 16 ++++++++-----
 .../apache/cassandra/service/StorageService.java   | 23 +++++++++++++++++++
 .../cassandra/service/StorageServiceMBean.java     |  5 +++++
 5 files changed, 64 insertions(+), 7 deletions(-)

diff --cc CHANGES.txt
index ba44dcf,477afd8..5af5b64
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -343,8 -2,11 +343,9 @@@
   * Make stop-server.bat wait for Cassandra to terminate (CASSANDRA-14829)
   * Correct sstable sorting for garbagecollect and levelled compaction 
(CASSANDRA-14870)
  Merged from 3.0:
+  * If SizeEstimatesRecorder misses a 'onDropTable' notification, the 
size_estimates table will never be cleared for that table. (CASSANDRA-14905)
 - * Counters fail to increment in 2.1/2.2 to 3.X mixed version clusters 
(CASSANDRA-14958)
   * Streaming needs to synchronise access to LifecycleTransaction 
(CASSANDRA-14554)
   * Fix cassandra-stress write hang with default options (CASSANDRA-14616)
 - * Differentiate between slices and RTs when decoding legacy bounds 
(CASSANDRA-14919)
   * Netty epoll IOExceptions caused by unclean client disconnects being logged 
at INFO (CASSANDRA-14909)
   * Unfiltered.isEmpty conflicts with Row extends AbstractCollection.isEmpty 
(CASSANDRA-14588)
   * RangeTombstoneList doesn't properly clean up mergeable or superseded rts 
in some cases (CASSANDRA-14894)
diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java
index 1b3b2a6,812659c..ddf6475
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@@ -777,7 -802,7 +777,7 @@@ public final class SystemKeyspac
  
      /**
       * This method is used to update the System Keyspace with the new tokens 
for this node
--    */
++     */
      public static synchronized void updateTokens(Collection<Token> tokens)
      {
          assert !tokens.isEmpty() : "removeEndpoint should be used instead";
@@@ -1275,49 -1277,57 +1275,73 @@@
          executeInternal(cql, keyspace, table);
      }
  
+     /**
+      * Clears size estimates for a keyspace (used to manually clean when we 
miss a keyspace drop)
+      */
+     public static void clearSizeEstimates(String keyspace)
+     {
+         String cql = String.format("DELETE FROM %s.%s WHERE keyspace_name = 
?", SchemaConstants.SYSTEM_KEYSPACE_NAME, SIZE_ESTIMATES);
+         executeInternal(cql, keyspace);
+     }
+ 
+     /**
+      * @return A multimap from keyspace to table for all tables with entries 
in size estimates
+      */
 -
+     public static synchronized SetMultimap<String, String> 
getTablesWithSizeEstimates()
+     {
+         SetMultimap<String, String> keyspaceTableMap = HashMultimap.create();
+         String cql = String.format("SELECT keyspace_name, table_name FROM 
%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SIZE_ESTIMATES);
+         UntypedResultSet rs = executeInternal(cql);
+         for (UntypedResultSet.Row row : rs)
+         {
+             keyspaceTableMap.put(row.getString("keyspace_name"), 
row.getString("table_name"));
+         }
 -
+         return keyspaceTableMap;
+     }
+ 
 -    public static synchronized void updateAvailableRanges(String keyspace, 
Collection<Range<Token>> completedRanges)
 +    public static synchronized void updateAvailableRanges(String keyspace, 
Collection<Range<Token>> completedFullRanges, Collection<Range<Token>> 
completedTransientRanges)
      {
 -        String cql = "UPDATE system.%s SET ranges = ranges + ? WHERE 
keyspace_name = ?";
 -        Set<ByteBuffer> rangesToUpdate = new 
HashSet<>(completedRanges.size());
 -        for (Range<Token> range : completedRanges)
 -        {
 -            rangesToUpdate.add(rangeToBytes(range));
 -        }
 -        executeInternal(String.format(cql, AVAILABLE_RANGES), rangesToUpdate, 
keyspace);
 +        String cql = "UPDATE system.%s SET full_ranges = full_ranges + ?, 
transient_ranges = transient_ranges + ? WHERE keyspace_name = ?";
 +        executeInternal(format(cql, AVAILABLE_RANGES_V2),
 +                        
completedFullRanges.stream().map(SystemKeyspace::rangeToBytes).collect(Collectors.toSet()),
 +                        
completedTransientRanges.stream().map(SystemKeyspace::rangeToBytes).collect(Collectors.toSet()),
 +                        keyspace);
      }
  
 -    public static synchronized Set<Range<Token>> getAvailableRanges(String 
keyspace, IPartitioner partitioner)
 +    /**
 +     * List of the streamed ranges, where transientness is encoded based on 
the source, where range was streamed from.
 +     */
 +    public static synchronized AvailableRanges getAvailableRanges(String 
keyspace, IPartitioner partitioner)
      {
 -        Set<Range<Token>> result = new HashSet<>();
          String query = "SELECT * FROM system.%s WHERE keyspace_name=?";
 -        UntypedResultSet rs = executeInternal(String.format(query, 
AVAILABLE_RANGES), keyspace);
 +        UntypedResultSet rs = executeInternal(format(query, 
AVAILABLE_RANGES_V2), keyspace);
 +
 +        ImmutableSet.Builder<Range<Token>> full = new 
ImmutableSet.Builder<>();
 +        ImmutableSet.Builder<Range<Token>> trans = new 
ImmutableSet.Builder<>();
          for (UntypedResultSet.Row row : rs)
          {
 -            Set<ByteBuffer> rawRanges = row.getSet("ranges", 
BytesType.instance);
 -            for (ByteBuffer rawRange : rawRanges)
 -            {
 -                result.add(byteBufferToRange(rawRange, partitioner));
 -            }
 +            Optional.ofNullable(row.getSet("full_ranges", BytesType.instance))
 +                    .ifPresent(full_ranges -> full_ranges.stream()
 +                            .map(buf -> byteBufferToRange(buf, partitioner))
 +                            .forEach(full::add));
 +            Optional.ofNullable(row.getSet("transient_ranges", 
BytesType.instance))
 +                    .ifPresent(transient_ranges -> transient_ranges.stream()
 +                            .map(buf -> byteBufferToRange(buf, partitioner))
 +                            .forEach(trans::add));
 +        }
 +        return new AvailableRanges(full.build(), trans.build());
 +    }
 +
 +    public static class AvailableRanges
 +    {
 +        public Set<Range<Token>> full;
 +        public Set<Range<Token>> trans;
 +
 +        private AvailableRanges(Set<Range<Token>> full, Set<Range<Token>> 
trans)
 +        {
 +            this.full = full;
 +            this.trans = trans;
          }
 -        return ImmutableSet.copyOf(result);
      }
  
      public static void resetAvailableRanges()
diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java
index c8fddab,b593190..af781d5
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@@ -326,9 -331,23 +326,19 @@@ public class CassandraDaemo
          // Re-populate token metadata after commit log recover (new peers 
might be loaded onto system keyspace #10293)
          StorageService.instance.populateTokenMetadata();
  
 -        // migrate any legacy (pre-3.0) hints from system.hints table into 
the new store
 -        new LegacyHintsMigrator(DatabaseDescriptor.getHintsDirectory(), 
DatabaseDescriptor.getMaxHintsFileSize()).migrate();
 -
 -        // migrate any legacy (pre-3.0) batch entries from system.batchlog to 
system.batches (new table format)
 -        LegacyBatchlogMigrator.migrate();
 -
          SystemKeyspace.finishStartup();
+ 
+         // Clean up system.size_estimates entries left lying around from 
missed keyspace drops (CASSANDRA-14905)
+         StorageService.instance.cleanupSizeEstimates();
+ 
+         // schedule periodic dumps of table size estimates into 
SystemKeyspace.SIZE_ESTIMATES_CF
+         // set cassandra.size_recorder_interval to 0 to disable
+         int sizeRecorderInterval = 
Integer.getInteger("cassandra.size_recorder_interval", 5 * 60);
+         if (sizeRecorderInterval > 0)
+             
ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(SizeEstimatesRecorder.instance,
 30, sizeRecorderInterval, TimeUnit.SECONDS);
+ 
 +        ActiveRepairService.instance.start();
 +
          // Prepared statements
          QueryProcessor.preloadPreparedStatement();
  
@@@ -409,20 -428,12 +419,14 @@@
          // due to scheduling errors or race conditions
          
ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(ColumnFamilyStore.getBackgroundCompactionTaskSubmitter(),
 5, 1, TimeUnit.MINUTES);
  
 -        // Thrift
 -        InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress();
 -        int rpcPort = DatabaseDescriptor.getRpcPort();
 -        int listenBacklog = DatabaseDescriptor.getRpcListenBacklog();
 -        thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog);
 +        // schedule periodic recomputation of speculative retry thresholds
 +        ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(
 +            () -> Keyspace.all().forEach(k -> 
k.getColumnFamilyStores().forEach(ColumnFamilyStore::updateSpeculationThreshold)),
 +            DatabaseDescriptor.getReadRpcTimeout(),
 +            DatabaseDescriptor.getReadRpcTimeout(),
 +            TimeUnit.MILLISECONDS
 +        );
  
-         // schedule periodic dumps of table size estimates into 
SystemKeyspace.SIZE_ESTIMATES_CF
-         // set cassandra.size_recorder_interval to 0 to disable
-         int sizeRecorderInterval = 
Integer.getInteger("cassandra.size_recorder_interval", 5 * 60);
-         if (sizeRecorderInterval > 0)
-             
ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(SizeEstimatesRecorder.instance,
 30, sizeRecorderInterval, TimeUnit.SECONDS);
- 
          // Native transport
          nativeTransportService = new NativeTransportService();
  
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 090b10b,7adefe2..ca453d0
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -3712,6 -3386,28 +3713,28 @@@ public class StorageService extends Not
          
FBUtilities.waitOnFuture(ScheduledExecutors.optionalTasks.submit(SizeEstimatesRecorder.instance));
      }
  
+     public void cleanupSizeEstimates()
+     {
+         SetMultimap<String, String> sizeEstimates = 
SystemKeyspace.getTablesWithSizeEstimates();
+ 
+         for (Entry<String, Collection<String>> tablesByKeyspace : 
sizeEstimates.asMap().entrySet())
+         {
+             String keyspace = tablesByKeyspace.getKey();
+             if (!Schema.instance.getKeyspaces().contains(keyspace))
+             {
+                 SystemKeyspace.clearSizeEstimates(keyspace);
+             }
+             else
+             {
+                 for (String table : tablesByKeyspace.getValue())
+                 {
 -                    if (!Schema.instance.hasCF(Pair.create(keyspace, table)))
++                    if (Schema.instance.getTableMetadataRef(keyspace, table) 
== null)
+                         SystemKeyspace.clearSizeEstimates(keyspace, table);
+                 }
+             }
+         }
+     }
+ 
      /**
       * @param allowIndexes Allow index CF names to be passed in
       * @param autoAddIndexes Automatically add secondary indexes if a CF has 
them


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to