This is an automated email from the ASF dual-hosted git repository.

edimitrova pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 1b4e1cc9303414f91535dab10f6342e1c1c6b8ac
Merge: 7214794 11cb810
Author: Ekaterina Dimitrova <[email protected]>
AuthorDate: Tue Jan 19 17:59:23 2021 -0500

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   1 +
 .../cassandra/concurrent/InfiniteLoopExecutor.java |   8 +
 .../cassandra/config/DatabaseDescriptor.java       |  14 +-
 .../org/apache/cassandra/db/ColumnFamilyStore.java | 249 ++++++++++++---------
 src/java/org/apache/cassandra/db/Memtable.java     |  13 +-
 src/java/org/apache/cassandra/db/ReadCommand.java  |   2 +-
 .../apache/cassandra/utils/memory/HeapPool.java    |   2 +-
 .../cassandra/utils/memory/MemtableAllocator.java  | 121 ++++++++--
 .../cassandra/utils/memory/MemtableCleaner.java    |  40 ++++
 .../utils/memory/MemtableCleanerThread.java        |  67 +++++-
 .../cassandra/utils/memory/MemtablePool.java       |  35 ++-
 .../apache/cassandra/utils/memory/NativePool.java  |   2 +-
 .../apache/cassandra/utils/memory/SlabPool.java    |   2 +-
 test/unit/org/apache/cassandra/cql3/CQLTester.java |  14 +-
 .../org/apache/cassandra/db/NativeCellTest.java    |  16 +-
 .../utils/memory/MemtableCleanerThreadTest.java    | 187 ++++++++++++++++
 .../utils/memory/NativeAllocatorTest.java          | 204 ++++++++++-------
 17 files changed, 709 insertions(+), 268 deletions(-)

diff --cc CHANGES.txt
index 0d7b92b,29723d4..3430527
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,51 -1,12 +1,52 @@@
 -3.11.10
 +4.0-beta5
 + * Correct memtable on-heap size calculations to match actual use 
(CASSANDRA-16318)
 + * Fix client notifications in CQL protocol v5 (CASSANDRA-16353)
 + * Too defensive check when picking sstables for preview repair 
(CASSANDRA-16284)
 + * Ensure pre-negotiation native protocol responses have correct stream id 
(CASSANDRA-16376)
 + * Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279)
 + * SSLFactory should initialize SSLContext before setting protocols 
(CASSANDRA-16362)
 + * Restore sasi dependencies jflex, snowball-stemmer, and concurrent-trees, 
in the cassandra-all pom (CASSANDRA-16303)
   * Fix DecimalDeserializer#toString OOM (CASSANDRA-14925)
 - * Rate limit validation compactions using compaction_throughput_mb_per_sec 
(CASSANDRA-16161)
 - * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to 
default of 1GB (CASSANDRA-16071)
 +Merged from 3.11:
  Merged from 3.0:
+  * Prevent unbounded number of pending flushing tasks (CASSANDRA-16261)
   * Improve empty hint file handling during startup (CASSANDRA-16162)
 - * Allow empty string in collections with COPY FROM in cqlsh (CASSANDRA-16372)
   * Fix skipping on pre-3.0 created compact storage sstables due to missing 
primary key liveness (CASSANDRA-16226)
 + * Allow empty string in collections with COPY FROM in cqlsh (CASSANDRA-16372
 +
 +4.0-beta4
 + * DROP COMPACT STORAGE should invalidate prepared statements still using 
CompactTableMetadata (CASSANDRA-16361)
 + * Update default num_tokens to 16 and 
allocate_tokens_for_local_replication_factor to 3 (CASSANDRA-13701)
 + * Remove use of String.intern() (CASSANDRA-15810)
 + * Fix the missing bb position in ByteBufferAccessor.getUnsignedShort 
(CASSANDRA-16249)
 + * Make sure OOM errors are rethrown on truncation failure (CASSANDRA-16254)
 + * Send back client warnings when creating too many tables/keyspaces 
(CASSANDRA-16309)
 + * Add dedicated tcp user timeout for streaming connection (CASSANDRA-16143)
 + * Add generatetokens script for offline token allocation strategy generation 
(CASSANDRA-16205)
 + * Remove Windows scripts (CASSANDRA-16171)
 + * Improve checksumming and compression in protocol V5 (CASSANDRA-15299)
 + * Optimised repair streaming improvements (CASSANDRA-16274)
 + * Update jctools dependency to 3.1.0 (CASSANDRA-16255)
 + * 'SSLEngine closed already' exception on failed outbound connection 
(CASSANDRA-16277)
 + * Drain and/or shutdown might throw because of slow messaging service 
shutdown (CASSANDRA-16276)
 + * Upgrade JNA to 5.6.0, dropping support for <=glibc-2.6 systems 
(CASSANDRA-16212)
 + * Add saved Host IDs to TokenMetadata at startup (CASSANDRA-16246)
 + * Ensure that CacheMetrics.requests is picked up by the metric reporter 
(CASSANDRA-16228)
 + * Add a ratelimiter to snapshot creation and deletion (CASSANDRA-13019)
 + * Produce consistent tombstone for reads to avoid digest mistmatch 
(CASSANDRA-15369)
 + * Fix SSTableloader issue when restoring a table named backups 
(CASSANDRA-16235)
 + * Invalid serialized size for responses caused by increasing message time by 
1ms which caused extra bytes in size calculation (CASSANDRA-16103)
 + * Throw BufferOverflowException from DataOutputBuffer for better visibility 
(CASSANDRA-16214)
 + * TLS connections to the storage port on a node without server encryption 
configured causes java.io.IOException accessing missing keystore 
(CASSANDRA-16144)
 + * Internode messaging catches OOMs and does not rethrow (CASSANDRA-15214)
 + * When a table attempts to clean up metrics, it was cleaning up all global 
table metrics (CASSANDRA-16095)
 + * Bring back the accepted encryption protocols list as configurable option 
(CASSANDRA-13325)
 + * DigestResolver.getData throws AssertionError since dataResponse is null 
(CASSANDRA-16097)
 + * Cannot replace_address /X because it doesn't exist in gossip 
(CASSANDRA-16213)
 + * cqlsh row_id resets on page boundaries (CASSANDRA-16160)
 +Merged from 3.11:
 + * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to 
default of 1GB (CASSANDRA-16071)
 +Merged from 3.0:
   * Extend the exclusion of replica filtering protection to other indices 
instead of just SASI (CASSANDRA-16311)
   * Synchronize transaction logs for JBOD (CASSANDRA-16225)
   * Fix the counting of cells per partition (CASSANDRA-16259)
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 304a004,aa9a4cc..7ddba08
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -1502,14 -1389,9 +1502,14 @@@ public class DatabaseDescripto
          return System.getProperty(Config.PROPERTY_PREFIX + 
"allocate_tokens_for_keyspace", conf.allocate_tokens_for_keyspace);
      }
  
 +    public static Integer getAllocateTokensForLocalRf()
 +    {
 +        return conf.allocate_tokens_for_local_replication_factor;
 +    }
 +
      public static Collection<String> tokensFromString(String tokenString)
      {
-         List<String> tokens = new ArrayList<String>();
+         List<String> tokens = new ArrayList<>();
          if (tokenString != null)
              for (String token : StringUtils.split(tokenString, ','))
                  tokens.add(token.trim());
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index ceacd0d,83241e5..d4818ac
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -107,15 -137,14 +108,15 @@@ public class ColumnFamilyStore implemen
      are finished. By having flushExecutor size the same size as each of the 
perDiskflushExecutors we make sure we can
      have that many flushes going at the same time.
      */
-     private static final ExecutorService flushExecutor = new 
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
-                                                                               
            Stage.KEEP_ALIVE_SECONDS,
-                                                                               
            TimeUnit.SECONDS,
-                                                                               
            new LinkedBlockingQueue<Runnable>(),
-                                                                               
            new NamedThreadFactory("MemtableFlushWriter"),
-                                                                               
            "internal");
+     private static final ThreadPoolExecutor flushExecutor = new 
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
 -                                                                              
               StageManager.KEEPALIVE,
++                                                                              
               Stage.KEEP_ALIVE_SECONDS,
+                                                                               
               TimeUnit.SECONDS,
+                                                                               
               new LinkedBlockingQueue<>(),
+                                                                               
               new NamedThreadFactory("MemtableFlushWriter"),
+                                                                               
               "internal");
  
      private static final ExecutorService [] perDiskflushExecutors = new 
ExecutorService[DatabaseDescriptor.getAllDataFileLocations().length];
 +
      static
      {
          for (int i = 0; i < 
DatabaseDescriptor.getAllDataFileLocations().length; i++)
@@@ -130,29 -159,36 +131,29 @@@
      }
  
      // post-flush executor is single threaded to provide guarantee that any 
flush Future on a CF will never return until prior flushes have completed
-     private static final ExecutorService postFlushExecutor = new 
JMXEnabledThreadPoolExecutor(1,
-                                                                               
                Stage.KEEP_ALIVE_SECONDS,
-                                                                               
                TimeUnit.SECONDS,
-                                                                               
                new LinkedBlockingQueue<Runnable>(),
-                                                                               
                new NamedThreadFactory("MemtablePostFlush"),
-                                                                               
                "internal");
- 
-     private static final ExecutorService reclaimExecutor = new 
JMXEnabledThreadPoolExecutor(1,
-                                                                               
              Stage.KEEP_ALIVE_SECONDS,
-                                                                               
              TimeUnit.SECONDS,
-                                                                               
              new LinkedBlockingQueue<Runnable>(),
-                                                                               
              new NamedThreadFactory("MemtableReclaimMemory"),
-                                                                               
              "internal");
+     private static final ThreadPoolExecutor postFlushExecutor = new 
JMXEnabledThreadPoolExecutor(1,
 -                                                                              
                   StageManager.KEEPALIVE,
++                                                                              
                   Stage.KEEP_ALIVE_SECONDS,
+                                                                               
                   TimeUnit.SECONDS,
+                                                                               
                   new LinkedBlockingQueue<>(),
+                                                                               
                   new NamedThreadFactory("MemtablePostFlush"),
+                                                                               
                   "internal");
+ 
+     private static final ThreadPoolExecutor reclaimExecutor = new 
JMXEnabledThreadPoolExecutor(1,
 -                                                                              
                 StageManager.KEEPALIVE,
++                                                                              
                 Stage.KEEP_ALIVE_SECONDS,
+                                                                               
                 TimeUnit.SECONDS,
+                                                                               
                 new LinkedBlockingQueue<>(),
+                                                                               
                 new NamedThreadFactory("MemtableReclaimMemory"),
+                                                                               
                 "internal");
  
 -    private static final String[] COUNTER_NAMES = new String[]{"raw", 
"count", "error", "string"};
 +    private static final String[] COUNTER_NAMES = new String[]{"table", 
"count", "error", "value"};
      private static final String[] COUNTER_DESCS = new String[]
 -                                                  { "partition key in raw hex 
bytes",
 -                                                    "value of this partition 
for given sampler",
 -                                                    "value is within the 
error bounds plus or minus of this",
 -                                                    "the partition key turned 
into a human readable format" };
 +    { "keyspace.tablename",
 +      "number of occurances",
 +      "error bounds",
 +      "value" };
      private static final CompositeType COUNTER_COMPOSITE_TYPE;
 -    private static final TabularType COUNTER_TYPE;
 -
 -    private static final String[] SAMPLER_NAMES = new String[]{"cardinality", 
"partitions"};
 -    private static final String[] SAMPLER_DESCS = new String[]
 -                                                  { "cardinality of 
partitions",
 -                                                    "list of counter results" 
};
  
      private static final String SAMPLING_RESULTS_NAME = "SAMPLING_RESULTS";
 -    private static final CompositeType SAMPLING_RESULT;
  
      public static final String SNAPSHOT_TRUNCATE_PREFIX = "truncated";
      public static final String SNAPSHOT_DROP_PREFIX = "dropped";
@@@ -627,12 -639,12 +624,12 @@@
       * Removes unnecessary files from the cf directory at startup: these 
include temp files, orphans, zero-length files
       * and compacted sstables. Files that cannot be recognized will be 
ignored.
       */
 -    public static void  scrubDataDirectories(CFMetaData metadata) throws 
StartupException
 +    public static void  scrubDataDirectories(TableMetadata metadata) throws 
StartupException
      {
 -        Directories directories = new Directories(metadata, 
initialDirectories);
 +        Directories directories = new Directories(metadata);
          Set<File> cleanedDirectories = new HashSet<>();
  
-          // clear ephemeral snapshots that were not properly cleared last 
session (CASSANDRA-7357)
+         // clear ephemeral snapshots that were not properly cleared last 
session (CASSANDRA-7357)
          clearEphemeralSnapshots(directories);
  
          directories.removeTemporaryDirectories();
@@@ -678,7 -687,7 +675,7 @@@
          }
  
          // cleanup incomplete saved caches
-         Pattern tmpCacheFilePattern = Pattern.compile(metadata.keyspace + "-" 
+ metadata.name + "-(Key|Row)Cache.*\\.tmp$");
 -        Pattern tmpCacheFilePattern = Pattern.compile(metadata.ksName + '-' + 
metadata.cfName + "-(Key|Row)Cache.*\\.tmp$");
++        Pattern tmpCacheFilePattern = Pattern.compile(metadata.keyspace + '-' 
+ metadata.name + "-(Key|Row)Cache.*\\.tmp$");
          File dir = new File(DatabaseDescriptor.getSavedCachesLocation());
  
          if (dir.exists())
@@@ -1645,8 -1691,8 +1682,8 @@@
          return (filter.isHeadFilter() && limits.hasEnoughLiveData(cached,
                                                                    nowInSec,
                                                                    
filter.selectsAllPartition(),
 -                                                                  
metadata.enforceStrictLiveness()))
 +                                                                  
enforceStrictLiveness))
-                 || filter.isFullyCoveredBy(cached);
+                || filter.isFullyCoveredBy(cached);
      }
  
      public int gcBefore(int nowInSec)
@@@ -2104,10 -2131,10 +2141,10 @@@
      {
          if (CacheService.instance.counterCache.getCapacity() == 0L) // 
counter cache disabled.
              return;
 -        
CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.ksAndCFName,
 partitionKey, clustering, column, path), clockAndCount);
 +        
CacheService.instance.counterCache.put(CounterCacheKey.create(metadata(), 
partitionKey, clustering, column, path), clockAndCount);
      }
  
-     public void forceMajorCompaction() throws InterruptedException, 
ExecutionException
+     public void forceMajorCompaction()
      {
          forceMajorCompaction(false);
      }
@@@ -2226,31 -2249,21 +2259,31 @@@
                  now = Math.max(now, sstable.maxDataAge);
          truncatedAt = now;
  
 -        Runnable truncateRunnable = () -> {
 -            logger.debug("Discarding sstable data for truncated CF + 
indexes");
 -            data.notifyTruncated(truncatedAt);
 +        Runnable truncateRunnable = new Runnable()
 +        {
 +            public void run()
 +            {
 +                logger.info("Truncating {}.{} with truncatedAt={}", 
keyspace.getName(), getTableName(), truncatedAt);
 +                // since truncation can happen at different times on 
different nodes, we need to make sure
 +                // that any repairs are aborted, otherwise we might clear the 
data on one node and then
 +                // stream in data that is actually supposed to have been 
deleted
 +                ActiveRepairService.instance.abort((prs) -> 
prs.getTableIds().contains(metadata.id),
 +                                                   "Stopping parent sessions 
{} due to truncation of tableId="+metadata.id);
 +                data.notifyTruncated(truncatedAt);
  
-                 if (DatabaseDescriptor.isAutoSnapshot())
-                     
snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(name, 
SNAPSHOT_TRUNCATE_PREFIX));
+             if (DatabaseDescriptor.isAutoSnapshot())
+                 snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(name, 
SNAPSHOT_TRUNCATE_PREFIX));
  
-                 discardSSTables(truncatedAt);
+             discardSSTables(truncatedAt);
  
-                 indexManager.truncateAllIndexesBlocking(truncatedAt);
-                 viewManager.truncateBlocking(replayAfter, truncatedAt);
+             indexManager.truncateAllIndexesBlocking(truncatedAt);
+             viewManager.truncateBlocking(replayAfter, truncatedAt);
  
 -            SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, 
truncatedAt, replayAfter);
 -            logger.trace("cleaning out row cache");
 -            invalidateCaches();
 +                SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, 
truncatedAt, replayAfter);
 +                logger.trace("cleaning out row cache");
 +                invalidateCaches();
 +
 +            }
          };
  
          runWithCompactionsDisabled(Executors.callable(truncateRunnable), 
true, true);
@@@ -2739,16 -2711,16 +2768,16 @@@
          if (keyspace == null)
              return null;
  
 -        UUID id = Schema.instance.getId(ksName, cfName);
 -        if (id == null)
 +        TableMetadata table = Schema.instance.getTableMetadata(ksName, 
cfName);
 +        if (table == null)
              return null;
  
 -        return keyspace.getColumnFamilyStore(id);
 +        return keyspace.getColumnFamilyStore(table.id);
      }
  
 -    public static TableMetrics metricsFor(UUID tableId)
 +    public static TableMetrics metricsFor(TableId tableId)
      {
-         return getIfExists(tableId).metric;
+         return Objects.requireNonNull(getIfExists(tableId)).metric;
      }
  
      public DiskBoundaries getDiskBoundaries()
@@@ -2760,21 -2732,4 +2789,21 @@@
      {
          diskBoundaryManager.invalidate();
      }
 +
 +    @Override
 +    public void setNeverPurgeTombstones(boolean value)
 +    {
 +        if (neverPurgeTombstones != value)
 +            logger.info("Changing neverPurgeTombstones for {}.{} from {} to 
{}", keyspace.getName(), getTableName(), neverPurgeTombstones, value);
 +        else
 +            logger.info("Not changing neverPurgeTombstones for {}.{}, it is 
{}", keyspace.getName(), getTableName(), neverPurgeTombstones);
 +
 +        neverPurgeTombstones = value;
 +    }
 +
 +    @Override
 +    public boolean getNeverPurgeTombstones()
 +    {
 +        return neverPurgeTombstones;
 +    }
- }
+ }
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index cdbe163,ae8b8d3..73c6416
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -87,13 -76,17 +90,13 @@@ public class Memtable implements Compar
          switch (DatabaseDescriptor.getMemtableAllocationType())
          {
              case unslabbed_heap_buffers:
-                 return new HeapPool(heapLimit, 
DatabaseDescriptor.getMemtableCleanupThreshold(), new 
ColumnFamilyStore.FlushLargestColumnFamily());
+                 return new HeapPool(heapLimit, cleaningThreshold, cleaner);
              case heap_buffers:
-                 return new SlabPool(heapLimit, 0, 
DatabaseDescriptor.getMemtableCleanupThreshold(), new 
ColumnFamilyStore.FlushLargestColumnFamily());
+                 return new SlabPool(heapLimit, 0, cleaningThreshold, cleaner);
              case offheap_buffers:
-                 return new SlabPool(heapLimit, offHeapLimit, 
DatabaseDescriptor.getMemtableCleanupThreshold(), new 
ColumnFamilyStore.FlushLargestColumnFamily());
 -                if (!FileUtils.isCleanerAvailable)
 -                {
 -                    throw new IllegalStateException("Could not free direct 
byte buffer: offheap_buffers is not a safe memtable_allocation_type without 
this ability, please adjust your config. This feature is only guaranteed to 
work on an Oracle JVM. Refusing to start.");
 -                }
+                 return new SlabPool(heapLimit, offHeapLimit, 
cleaningThreshold, cleaner);
              case offheap_objects:
-                 return new NativePool(heapLimit, offHeapLimit, 
DatabaseDescriptor.getMemtableCleanupThreshold(), new 
ColumnFamilyStore.FlushLargestColumnFamily());
+                 return new NativePool(heapLimit, offHeapLimit, 
cleaningThreshold, cleaner);
              default:
                  throw new AssertionError();
          }
diff --cc src/java/org/apache/cassandra/utils/memory/MemtablePool.java
index 5ef023f,89d5e37..58b2910
--- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
@@@ -74,8 -80,19 +80,18 @@@ public abstract class MemtablePoo
          ExecutorUtils.shutdownNowAndWait(timeout, unit, cleaner);
      }
  
 -
      public abstract MemtableAllocator newAllocator();
  
+     public boolean needsCleaning()
+     {
+         return onHeap.needsCleaning() || offHeap.needsCleaning();
+     }
+ 
+     public Long getNumPendingtasks()
+     {
+         return numPendingTasks.getValue();
+     }
+ 
      /**
       * Note the difference between acquire() and allocate(); allocate() makes 
more resources available to all owners,
       * and acquire() makes shared resources unavailable but still recorded. 
An Owner must always acquire resources,
diff --cc test/unit/org/apache/cassandra/cql3/CQLTester.java
index beff1e4,e09c4df..96cd105
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@@ -22,30 -22,15 +22,29 @@@ import java.io.IOException
  import java.math.BigDecimal;
  import java.math.BigInteger;
  import java.net.InetAddress;
 +import java.net.InetSocketAddress;
 +import java.net.MalformedURLException;
  import java.net.ServerSocket;
  import java.nio.ByteBuffer;
 +import java.rmi.server.RMISocketFactory;
  import java.util.*;
  import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.ExecutionException;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.function.Consumer;
 +import java.util.regex.Matcher;
 +import java.util.regex.Pattern;
  import java.util.stream.Collectors;
  
 +import javax.management.MBeanServerConnection;
 +import javax.management.remote.JMXConnector;
 +import javax.management.remote.JMXConnectorFactory;
 +import javax.management.remote.JMXConnectorServer;
 +import javax.management.remote.JMXServiceURL;
 +import javax.management.remote.rmi.RMIConnectorServer;
 +
  import com.google.common.base.Objects;
 +import com.google.common.base.Strings;
  import com.google.common.collect.ImmutableSet;
  import org.junit.*;
  import org.slf4j.Logger;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to