This is an automated email from the ASF dual-hosted git repository. edimitrova pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 1b4e1cc9303414f91535dab10f6342e1c1c6b8ac Merge: 7214794 11cb810 Author: Ekaterina Dimitrova <[email protected]> AuthorDate: Tue Jan 19 17:59:23 2021 -0500 Merge branch 'cassandra-3.11' into trunk CHANGES.txt | 1 + .../cassandra/concurrent/InfiniteLoopExecutor.java | 8 + .../cassandra/config/DatabaseDescriptor.java | 14 +- .../org/apache/cassandra/db/ColumnFamilyStore.java | 249 ++++++++++++--------- src/java/org/apache/cassandra/db/Memtable.java | 13 +- src/java/org/apache/cassandra/db/ReadCommand.java | 2 +- .../apache/cassandra/utils/memory/HeapPool.java | 2 +- .../cassandra/utils/memory/MemtableAllocator.java | 121 ++++++++-- .../cassandra/utils/memory/MemtableCleaner.java | 40 ++++ .../utils/memory/MemtableCleanerThread.java | 67 +++++- .../cassandra/utils/memory/MemtablePool.java | 35 ++- .../apache/cassandra/utils/memory/NativePool.java | 2 +- .../apache/cassandra/utils/memory/SlabPool.java | 2 +- test/unit/org/apache/cassandra/cql3/CQLTester.java | 14 +- .../org/apache/cassandra/db/NativeCellTest.java | 16 +- .../utils/memory/MemtableCleanerThreadTest.java | 187 ++++++++++++++++ .../utils/memory/NativeAllocatorTest.java | 204 ++++++++++------- 17 files changed, 709 insertions(+), 268 deletions(-) diff --cc CHANGES.txt index 0d7b92b,29723d4..3430527 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,51 -1,12 +1,52 @@@ -3.11.10 +4.0-beta5 + * Correct memtable on-heap size calculations to match actual use (CASSANDRA-16318) + * Fix client notifications in CQL protocol v5 (CASSANDRA-16353) + * Too defensive check when picking sstables for preview repair (CASSANDRA-16284) + * Ensure pre-negotiation native protocol responses have correct stream id (CASSANDRA-16376) + * Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279) + * SSLFactory should initialize SSLContext before setting protocols (CASSANDRA-16362) + * Restore sasi dependencies jflex, snowball-stemmer, and concurrent-trees, in the cassandra-all pom (CASSANDRA-16303) * Fix DecimalDeserializer#toString OOM (CASSANDRA-14925) - * Rate limit validation compactions using compaction_throughput_mb_per_sec (CASSANDRA-16161) - * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to default of 1GB (CASSANDRA-16071) +Merged from 3.11: Merged from 3.0: + * Prevent unbounded number of pending flushing tasks (CASSANDRA-16261) * Improve empty hint file handling during startup (CASSANDRA-16162) - * Allow empty string in collections with COPY FROM in cqlsh (CASSANDRA-16372) * Fix skipping on pre-3.0 created compact storage sstables due to missing primary key liveness (CASSANDRA-16226) + * Allow empty string in collections with COPY FROM in cqlsh (CASSANDRA-16372 + +4.0-beta4 + * DROP COMPACT STORAGE should invalidate prepared statements still using CompactTableMetadata (CASSANDRA-16361) + * Update default num_tokens to 16 and allocate_tokens_for_local_replication_factor to 3 (CASSANDRA-13701) + * Remove use of String.intern() (CASSANDRA-15810) + * Fix the missing bb position in ByteBufferAccessor.getUnsignedShort (CASSANDRA-16249) + * Make sure OOM errors are rethrown on truncation failure (CASSANDRA-16254) + * Send back client warnings when creating too many tables/keyspaces (CASSANDRA-16309) + * Add dedicated tcp user timeout for streaming connection (CASSANDRA-16143) + * Add generatetokens script for offline token allocation strategy generation (CASSANDRA-16205) + * Remove Windows scripts (CASSANDRA-16171) + * Improve checksumming and compression in protocol V5 (CASSANDRA-15299) + * Optimised repair streaming improvements (CASSANDRA-16274) + * Update jctools dependency to 3.1.0 (CASSANDRA-16255) + * 'SSLEngine closed already' exception on failed outbound connection (CASSANDRA-16277) + * Drain and/or shutdown might throw because of slow messaging service shutdown (CASSANDRA-16276) + * Upgrade JNA to 5.6.0, dropping support for <=glibc-2.6 systems (CASSANDRA-16212) + * Add saved Host IDs to TokenMetadata at startup (CASSANDRA-16246) + * Ensure that CacheMetrics.requests is picked up by the metric reporter (CASSANDRA-16228) + * Add a ratelimiter to snapshot creation and deletion (CASSANDRA-13019) + * Produce consistent tombstone for reads to avoid digest mistmatch (CASSANDRA-15369) + * Fix SSTableloader issue when restoring a table named backups (CASSANDRA-16235) + * Invalid serialized size for responses caused by increasing message time by 1ms which caused extra bytes in size calculation (CASSANDRA-16103) + * Throw BufferOverflowException from DataOutputBuffer for better visibility (CASSANDRA-16214) + * TLS connections to the storage port on a node without server encryption configured causes java.io.IOException accessing missing keystore (CASSANDRA-16144) + * Internode messaging catches OOMs and does not rethrow (CASSANDRA-15214) + * When a table attempts to clean up metrics, it was cleaning up all global table metrics (CASSANDRA-16095) + * Bring back the accepted encryption protocols list as configurable option (CASSANDRA-13325) + * DigestResolver.getData throws AssertionError since dataResponse is null (CASSANDRA-16097) + * Cannot replace_address /X because it doesn't exist in gossip (CASSANDRA-16213) + * cqlsh row_id resets on page boundaries (CASSANDRA-16160) +Merged from 3.11: + * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to default of 1GB (CASSANDRA-16071) +Merged from 3.0: * Extend the exclusion of replica filtering protection to other indices instead of just SASI (CASSANDRA-16311) * Synchronize transaction logs for JBOD (CASSANDRA-16225) * Fix the counting of cells per partition (CASSANDRA-16259) diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 304a004,aa9a4cc..7ddba08 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@@ -1502,14 -1389,9 +1502,14 @@@ public class DatabaseDescripto return System.getProperty(Config.PROPERTY_PREFIX + "allocate_tokens_for_keyspace", conf.allocate_tokens_for_keyspace); } + public static Integer getAllocateTokensForLocalRf() + { + return conf.allocate_tokens_for_local_replication_factor; + } + public static Collection<String> tokensFromString(String tokenString) { - List<String> tokens = new ArrayList<String>(); + List<String> tokens = new ArrayList<>(); if (tokenString != null) for (String token : StringUtils.split(tokenString, ',')) tokens.add(token.trim()); diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index ceacd0d,83241e5..d4818ac --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -107,15 -137,14 +108,15 @@@ public class ColumnFamilyStore implemen are finished. By having flushExecutor size the same size as each of the perDiskflushExecutors we make sure we can have that many flushes going at the same time. */ - private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(), - Stage.KEEP_ALIVE_SECONDS, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), - new NamedThreadFactory("MemtableFlushWriter"), - "internal"); + private static final ThreadPoolExecutor flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(), - StageManager.KEEPALIVE, ++ Stage.KEEP_ALIVE_SECONDS, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("MemtableFlushWriter"), + "internal"); private static final ExecutorService [] perDiskflushExecutors = new ExecutorService[DatabaseDescriptor.getAllDataFileLocations().length]; + static { for (int i = 0; i < DatabaseDescriptor.getAllDataFileLocations().length; i++) @@@ -130,29 -159,36 +131,29 @@@ } // post-flush executor is single threaded to provide guarantee that any flush Future on a CF will never return until prior flushes have completed - private static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor(1, - Stage.KEEP_ALIVE_SECONDS, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), - new NamedThreadFactory("MemtablePostFlush"), - "internal"); - - private static final ExecutorService reclaimExecutor = new JMXEnabledThreadPoolExecutor(1, - Stage.KEEP_ALIVE_SECONDS, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), - new NamedThreadFactory("MemtableReclaimMemory"), - "internal"); + private static final ThreadPoolExecutor postFlushExecutor = new JMXEnabledThreadPoolExecutor(1, - StageManager.KEEPALIVE, ++ Stage.KEEP_ALIVE_SECONDS, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("MemtablePostFlush"), + "internal"); + + private static final ThreadPoolExecutor reclaimExecutor = new JMXEnabledThreadPoolExecutor(1, - StageManager.KEEPALIVE, ++ Stage.KEEP_ALIVE_SECONDS, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("MemtableReclaimMemory"), + "internal"); - private static final String[] COUNTER_NAMES = new String[]{"raw", "count", "error", "string"}; + private static final String[] COUNTER_NAMES = new String[]{"table", "count", "error", "value"}; private static final String[] COUNTER_DESCS = new String[] - { "partition key in raw hex bytes", - "value of this partition for given sampler", - "value is within the error bounds plus or minus of this", - "the partition key turned into a human readable format" }; + { "keyspace.tablename", + "number of occurances", + "error bounds", + "value" }; private static final CompositeType COUNTER_COMPOSITE_TYPE; - private static final TabularType COUNTER_TYPE; - - private static final String[] SAMPLER_NAMES = new String[]{"cardinality", "partitions"}; - private static final String[] SAMPLER_DESCS = new String[] - { "cardinality of partitions", - "list of counter results" }; private static final String SAMPLING_RESULTS_NAME = "SAMPLING_RESULTS"; - private static final CompositeType SAMPLING_RESULT; public static final String SNAPSHOT_TRUNCATE_PREFIX = "truncated"; public static final String SNAPSHOT_DROP_PREFIX = "dropped"; @@@ -627,12 -639,12 +624,12 @@@ * Removes unnecessary files from the cf directory at startup: these include temp files, orphans, zero-length files * and compacted sstables. Files that cannot be recognized will be ignored. */ - public static void scrubDataDirectories(CFMetaData metadata) throws StartupException + public static void scrubDataDirectories(TableMetadata metadata) throws StartupException { - Directories directories = new Directories(metadata, initialDirectories); + Directories directories = new Directories(metadata); Set<File> cleanedDirectories = new HashSet<>(); - // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357) + // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357) clearEphemeralSnapshots(directories); directories.removeTemporaryDirectories(); @@@ -678,7 -687,7 +675,7 @@@ } // cleanup incomplete saved caches - Pattern tmpCacheFilePattern = Pattern.compile(metadata.keyspace + "-" + metadata.name + "-(Key|Row)Cache.*\\.tmp$"); - Pattern tmpCacheFilePattern = Pattern.compile(metadata.ksName + '-' + metadata.cfName + "-(Key|Row)Cache.*\\.tmp$"); ++ Pattern tmpCacheFilePattern = Pattern.compile(metadata.keyspace + '-' + metadata.name + "-(Key|Row)Cache.*\\.tmp$"); File dir = new File(DatabaseDescriptor.getSavedCachesLocation()); if (dir.exists()) @@@ -1645,8 -1691,8 +1682,8 @@@ return (filter.isHeadFilter() && limits.hasEnoughLiveData(cached, nowInSec, filter.selectsAllPartition(), - metadata.enforceStrictLiveness())) + enforceStrictLiveness)) - || filter.isFullyCoveredBy(cached); + || filter.isFullyCoveredBy(cached); } public int gcBefore(int nowInSec) @@@ -2104,10 -2131,10 +2141,10 @@@ { if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled. return; - CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.ksAndCFName, partitionKey, clustering, column, path), clockAndCount); + CacheService.instance.counterCache.put(CounterCacheKey.create(metadata(), partitionKey, clustering, column, path), clockAndCount); } - public void forceMajorCompaction() throws InterruptedException, ExecutionException + public void forceMajorCompaction() { forceMajorCompaction(false); } @@@ -2226,31 -2249,21 +2259,31 @@@ now = Math.max(now, sstable.maxDataAge); truncatedAt = now; - Runnable truncateRunnable = () -> { - logger.debug("Discarding sstable data for truncated CF + indexes"); - data.notifyTruncated(truncatedAt); + Runnable truncateRunnable = new Runnable() + { + public void run() + { + logger.info("Truncating {}.{} with truncatedAt={}", keyspace.getName(), getTableName(), truncatedAt); + // since truncation can happen at different times on different nodes, we need to make sure + // that any repairs are aborted, otherwise we might clear the data on one node and then + // stream in data that is actually supposed to have been deleted + ActiveRepairService.instance.abort((prs) -> prs.getTableIds().contains(metadata.id), + "Stopping parent sessions {} due to truncation of tableId="+metadata.id); + data.notifyTruncated(truncatedAt); - if (DatabaseDescriptor.isAutoSnapshot()) - snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(name, SNAPSHOT_TRUNCATE_PREFIX)); + if (DatabaseDescriptor.isAutoSnapshot()) + snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(name, SNAPSHOT_TRUNCATE_PREFIX)); - discardSSTables(truncatedAt); + discardSSTables(truncatedAt); - indexManager.truncateAllIndexesBlocking(truncatedAt); - viewManager.truncateBlocking(replayAfter, truncatedAt); + indexManager.truncateAllIndexesBlocking(truncatedAt); + viewManager.truncateBlocking(replayAfter, truncatedAt); - SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter); - logger.trace("cleaning out row cache"); - invalidateCaches(); + SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter); + logger.trace("cleaning out row cache"); + invalidateCaches(); + + } }; runWithCompactionsDisabled(Executors.callable(truncateRunnable), true, true); @@@ -2739,16 -2711,16 +2768,16 @@@ if (keyspace == null) return null; - UUID id = Schema.instance.getId(ksName, cfName); - if (id == null) + TableMetadata table = Schema.instance.getTableMetadata(ksName, cfName); + if (table == null) return null; - return keyspace.getColumnFamilyStore(id); + return keyspace.getColumnFamilyStore(table.id); } - public static TableMetrics metricsFor(UUID tableId) + public static TableMetrics metricsFor(TableId tableId) { - return getIfExists(tableId).metric; + return Objects.requireNonNull(getIfExists(tableId)).metric; } public DiskBoundaries getDiskBoundaries() @@@ -2760,21 -2732,4 +2789,21 @@@ { diskBoundaryManager.invalidate(); } + + @Override + public void setNeverPurgeTombstones(boolean value) + { + if (neverPurgeTombstones != value) + logger.info("Changing neverPurgeTombstones for {}.{} from {} to {}", keyspace.getName(), getTableName(), neverPurgeTombstones, value); + else + logger.info("Not changing neverPurgeTombstones for {}.{}, it is {}", keyspace.getName(), getTableName(), neverPurgeTombstones); + + neverPurgeTombstones = value; + } + + @Override + public boolean getNeverPurgeTombstones() + { + return neverPurgeTombstones; + } - } + } diff --cc src/java/org/apache/cassandra/db/Memtable.java index cdbe163,ae8b8d3..73c6416 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@@ -87,13 -76,17 +90,13 @@@ public class Memtable implements Compar switch (DatabaseDescriptor.getMemtableAllocationType()) { case unslabbed_heap_buffers: - return new HeapPool(heapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily()); + return new HeapPool(heapLimit, cleaningThreshold, cleaner); case heap_buffers: - return new SlabPool(heapLimit, 0, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily()); + return new SlabPool(heapLimit, 0, cleaningThreshold, cleaner); case offheap_buffers: - return new SlabPool(heapLimit, offHeapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily()); - if (!FileUtils.isCleanerAvailable) - { - throw new IllegalStateException("Could not free direct byte buffer: offheap_buffers is not a safe memtable_allocation_type without this ability, please adjust your config. This feature is only guaranteed to work on an Oracle JVM. Refusing to start."); - } + return new SlabPool(heapLimit, offHeapLimit, cleaningThreshold, cleaner); case offheap_objects: - return new NativePool(heapLimit, offHeapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily()); + return new NativePool(heapLimit, offHeapLimit, cleaningThreshold, cleaner); default: throw new AssertionError(); } diff --cc src/java/org/apache/cassandra/utils/memory/MemtablePool.java index 5ef023f,89d5e37..58b2910 --- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java +++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java @@@ -74,8 -80,19 +80,18 @@@ public abstract class MemtablePoo ExecutorUtils.shutdownNowAndWait(timeout, unit, cleaner); } - public abstract MemtableAllocator newAllocator(); + public boolean needsCleaning() + { + return onHeap.needsCleaning() || offHeap.needsCleaning(); + } + + public Long getNumPendingtasks() + { + return numPendingTasks.getValue(); + } + /** * Note the difference between acquire() and allocate(); allocate() makes more resources available to all owners, * and acquire() makes shared resources unavailable but still recorded. An Owner must always acquire resources, diff --cc test/unit/org/apache/cassandra/cql3/CQLTester.java index beff1e4,e09c4df..96cd105 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@@ -22,30 -22,15 +22,29 @@@ import java.io.IOException import java.math.BigDecimal; import java.math.BigInteger; import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; import java.net.ServerSocket; import java.nio.ByteBuffer; +import java.rmi.server.RMISocketFactory; import java.util.*; import java.util.concurrent.CountDownLatch; - import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; +import javax.management.MBeanServerConnection; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXConnectorServer; +import javax.management.remote.JMXServiceURL; +import javax.management.remote.rmi.RMIConnectorServer; + import com.google.common.base.Objects; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; import org.junit.*; import org.slf4j.Logger; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
