This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 7df4530882c44f6de942b225af455f6399b66302 Merge: 995c3abc42 5be1038c5d Author: David Capwell <[email protected]> AuthorDate: Thu Jan 12 09:46:57 2023 -0800 Merge branch 'cassandra-4.1' into trunk CHANGES.txt | 1 + conf/cassandra.yaml | 10 ++ src/java/org/apache/cassandra/config/Config.java | 3 + .../cassandra/config/DatabaseDescriptor.java | 27 ++++ .../org/apache/cassandra/config/DurationSpec.java | 10 ++ .../streaming/CassandraCompressedStreamReader.java | 7 +- .../streaming/CassandraCompressedStreamWriter.java | 3 +- .../CassandraEntireSSTableStreamReader.java | 2 +- .../CassandraEntireSSTableStreamWriter.java | 2 +- .../db/streaming/CassandraStreamReader.java | 7 +- .../db/streaming/CassandraStreamWriter.java | 6 +- .../apache/cassandra/streaming/ProgressInfo.java | 5 +- .../apache/cassandra/streaming/StreamEvent.java | 4 +- .../apache/cassandra/streaming/StreamManager.java | 26 ++++ .../cassandra/streaming/StreamManagerMBean.java | 20 +++ .../cassandra/streaming/StreamResultFuture.java | 15 ++- .../apache/cassandra/streaming/StreamSession.java | 22 ++-- .../apache/cassandra/streaming/StreamingState.java | 143 +++++---------------- .../management/ProgressInfoCompositeData.java | 3 + .../test/streaming/RebuildStreamingTest.java | 33 ++++- .../test/streaming/StreamingStatsDisabledTest.java | 65 ++++++++++ .../distributed/util/QueryResultUtil.java | 7 + .../db/virtual/StreamingVirtualTableTest.java | 85 +++++++++--- .../cassandra/streaming/SessionInfoTest.java | 4 +- 24 files changed, 360 insertions(+), 150 deletions(-) diff --cc CHANGES.txt index 682316e397,c5d192a9c8..b1bc6f0cc9 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,103 -1,27 +1,104 @@@ -4.1.1 +4.2 + * Fix serialization error in new getsstables --show-levels option (CASSANDRA-18140) + * Use checked casts when reading vints as ints (CASSANDRA-18099) + * Add Mutation Serialization Caching (CASSANDRA-17998) + * Only reload compaction strategies if disk boundaries change (CASSANDRA-17874) + * CEP-10: Simulator Java11 Support (CASSANDRA-17178) + * Set the major compaction type correctly for compactionstats (CASSANDRA-18055) + * Print exception message without stacktrace when nodetool commands fail on probe.getOwnershipWithPort() (CASSANDRA-18079) + * Add option to print level in nodetool getsstables output (CASSANDRA-18023) + * Implement a guardrail for not having zero default_time_to_live on tables with TWCS (CASSANDRA-18042) + * Add CQL scalar functions for collection aggregation (CASSANDRA-18060) + * Make cassandra.replayList property for CommitLogReplayer possible to react on keyspaces only (CASSANDRA-18044) + * Add Mathematical functions (CASSANDRA-17221) + * Make incremental backup configurable per table (CASSANDRA-15402) + * Change shebangs of Python scripts to resolve Python 3 from env command (CASSANDRA-17832) + * Add reasons to guardrail messages and consider guardrails in the error message for needed ALLOW FILTERING (CASSANDRA-17967) + * Add support for CQL functions on collections, tuples and UDTs (CASSANDRA-17811) + * Add flag to exclude nodes from local DC when running nodetool rebuild (CASSANDRA-17870) + * Adding endpoint verification option to client_encryption_options (CASSANDRA-18034) + * Replace 'wcwidth.py' with pypi module (CASSANDRA-17287) + * Add nodetool forcecompact to remove tombstoned or ttl'd data ignoring GC grace for given table and partition keys (CASSANDRA-17711) + * Offer IF (NOT) EXISTS in cqlsh completion for CREATE TYPE, DROP TYPE, CREATE ROLE and DROP ROLE (CASSANDRA-16640) + * Nodetool bootstrap resume will now return an error if the operation fails (CASSANDRA-16491) + * Disable resumable bootstrap by default (CASSANDRA-17679) + * Include Git SHA in --verbose flag for nodetool version (CASSANDRA-17753) + * Update Byteman to 4.0.20 and Jacoco to 0.8.8 (CASSANDRA-16413) + * Add memtable option among possible tab completions for a table (CASSANDRA-17982) + * Adds a trie-based memtable implementation (CASSANDRA-17240) + * Further improves precision of memtable heap tracking (CASSANDRA-17240) + * Fix formatting of metrics documentation (CASSANDRA-17961) + * Keep sstable level when streaming for decommission and move (CASSANDRA-17969) + * Add Unavailables metric for CASWrite in the docs (CASSANDRA-16357) + * Make Cassandra logs able to be viewed in the virtual table system_views.system_logs (CASSANDRA-17946) + * IllegalArgumentException in Gossiper#order due to concurrent mutations to elements being applied (CASSANDRA-17908) + * Include estimated active compaction remaining write size when starting a new compaction (CASSANDRA-17931) + * Mixed mode support for internode authentication during TLS upgrades (CASSANDRA-17923) + * Revert Mockito downgrade from CASSANDRA-17750 (CASSANDRA-17496) + * Add --older-than and --older-than-timestamp options for nodetool clearsnapshots (CASSANDRA-16860) + * Fix "open RT bound as its last item" exception (CASSANDRA-17810) + * Fix leak of non-standard Java types in JMX MBeans `org.apache.cassandra.db:type=StorageService` + and `org.apache.cassandra.db:type=RepairService` as clients using JMX cannot handle them. More details in NEWS.txt (CASSANDRA-17668) + * Deprecate Throwables.propagate usage (CASSANDRA-14218) + * Allow disabling hotness persistence for high sstable counts (CASSANDRA-17868) + * Prevent NullPointerException when changing neverPurgeTombstones from true to false (CASSANDRA-17897) + * Add metrics around storage usage and compression (CASSANDRA-17898) + * Remove usage of deprecated javax certificate classes (CASSANDRA-17867) + * Make sure preview repairs don't optimise streams unless configured to (CASSANDRA-17865) + * Optionally avoid hint transfer during decommission (CASSANDRA-17808) + * Make disabling auto snapshot on selected tables possible (CASSANDRA-10383) + * Introduce compaction priorities to prevent upgrade compaction inability to finish (CASSANDRA-17851) + * Prevent a user from manually removing ephemeral snapshots (CASSANDRA-17757) + * Remove dependency on Maven Ant Tasks (CASSANDRA-17750) + * Update ASM(9.1 to 9.3), Mockito(1.10.10 to 1.12.13) and ByteBuddy(3.2.4 to 4.7.0) (CASSANDRA-17835) + * Add the ability for operators to loosen the definition of "empty" for edge cases (CASSANDRA-17842) + * Fix potential out of range exception on column index downsampling (CASSANDRA-17839) + * Introduce target directory to vtable output for sstable_tasks and for compactionstats (CASSANDRA-13010) + * Read/Write/Truncate throw RequestFailure in a race condition with callback timeouts, should return Timeout instead (CASSANDRA-17828) + * Add ability to log load profiles at fixed intervals (CASSANDRA-17821) + * Protect against Gossip backing up due to a quarantined endpoint without version information (CASSANDRA-17830) + * NPE in org.apache.cassandra.cql3.Attributes.getTimeToLive (CASSANDRA-17822) + * Add guardrail for column size (CASSANDRA-17151) + * When doing a host replacement, we need to check that the node is a live node before failing with "Cannot replace a live node..." (CASSANDRA-17805) + * Add support to generate a One-Shot heap dump on unhandled exceptions (CASSANDRA-17795) + * Rate-limit new client connection auth setup to avoid overwhelming bcrypt (CASSANDRA-17812) + * DataOutputBuffer#scratchBuffer can use off-heap or on-heap memory as a means to control memory allocations (CASSANDRA-16471) + * Add ability to read the TTLs and write times of the elements of a collection and/or UDT (CASSANDRA-8877) + * Removed Python < 2.7 support from formatting.py (CASSANDRA-17694) + * Cleanup pylint issues with pylexotron.py (CASSANDRA-17779) + * NPE bug in streaming checking if SSTable is being repaired (CASSANDRA-17801) + * Users of NativeLibrary should handle lack of JNA appropriately when running in client mode (CASSANDRA-17794) + * Warn on unknown directories found in system keyspace directory rather than kill node during startup checks (CASSANDRA-17777) + * Log duplicate rows sharing a partition key found in verify and scrub (CASSANDRA-17789) + * Add separate thread pool for Secondary Index building so it doesn't block compactions (CASSANDRA-17781) + * Added JMX call to getSSTableCountPerTWCSBucket for TWCS (CASSANDRA-17774) + * When doing a host replacement, -Dcassandra.broadcast_interval_ms is used to know when to check the ring but checks that the ring wasn't changed in -Dcassandra.ring_delay_ms, changes to ring delay should not depend on when we publish load stats (CASSANDRA-17776) + * When bootstrap fails, CassandraRoleManager may attempt to do read queries that fail with "Cannot read from a bootstrapping node", and increments unavailables counters (CASSANDRA-17754) + * Add guardrail to disallow DROP KEYSPACE commands (CASSANDRA-17767) + * Remove ephemeral snapshot marker file and introduce a flag to SnapshotManifest (CASSANDRA-16911) + * Add a virtual table that exposes currently running queries (CASSANDRA-15241) + * Allow sstableloader to specify table without relying on path (CASSANDRA-16584) + * Fix TestGossipingPropertyFileSnitch.test_prefer_local_reconnect_on_listen_address (CASSANDRA-17700) + * Add ByteComparable API (CASSANDRA-6936) + * Add guardrail for maximum replication factor (CASSANDRA-17500) + * Increment CQLSH to version 6.2.0 for release 4.2 (CASSANDRA-17646) + * Adding support to perform certificate based internode authentication (CASSANDRA-17661) + * Option to disable CDC writes of repaired data (CASSANDRA-17666) + * When a node is bootstrapping it gets the whole gossip state but applies in random order causing some cases where StorageService will fail causing an instance to not show up in TokenMetadata (CASSANDRA-17676) + * Add CQLSH command SHOW REPLICAS (CASSANDRA-17577) + * Add guardrail to allow disabling of SimpleStrategy (CASSANDRA-17647) + * Change default directory permission to 750 in packaging (CASSANDRA-17470) + * Adding support for TLS client authentication for internode communication (CASSANDRA-17513) + * Add new CQL function maxWritetime (CASSANDRA-17425) + * Add guardrail for ALTER TABLE ADD / DROP / REMOVE column operations (CASSANDRA-17495) + * Rename DisableFlag class to EnableFlag on guardrails (CASSANDRA-17544) +Merged from 4.1: + * Streaming progress virtual table lock contention can trigger TCP_USER_TIMEOUT and fail streaming (CASSANDRA-18110) - * Fix perpetual load of denylist on read in cases where denylist can never be loaded (CASSANDRA-18116) -Merged from 4.0: * Avoid ConcurrentModificationException in STCS/DTCS/TWCS.getSSTables (CASSANDRA-17977) - * Restore internode custom tracing on 4.0's new messaging system (CASSANDRA-17981) -Merged from 3.11: -Merged from 3.0: - * Add to the IntelliJ Git Window issue navigation links to Cassandra's Jira (CASSANDRA-18126) - * Avoid anticompaction mixing data from two different time windows with TWCS (CASSANDRA-17970) - * Do not spam the logs with MigrationCoordinator not being able to pull schemas (CASSANDRA-18096) - -4.1.0 * Fix ContentionStrategy backoff and Clock.waitUntil (CASSANDRA-18086) -Merged from 4.0: -Merged from 3.11: -Merged from 3.0: - * Fix incorrect resource name in LIST PERMISSION output (CASSANDRA-17848) - * Suppress CVE-2022-41854 and similar (CASSANDRA-18083) - - -4.1-rc1 * Avoid schema mismatch problems on memtable API misconfiguration (CASSANDRA-18040) * Start Paxos auto repair in CassandraDaemon (CASSANDRA-18029) + * Fix error message about type hints (CASSANDRA-17915) * Restore streaming_keep_alive_period on the netty control streaming channel (CASSANDRA-17768) * Move Schema.FORCE_LOAD_KEYSPACES and Schema.FORCE_LOAD_KEYSPACES_PROP to CassandraRelevantProps (CASSANDRA-17783) * Add --resolve-ip option to nodetool gossipinfo (CASSANDRA-17934) diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java index 0c2df7d419,811717f85d..3917e1a5fd --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@@ -798,166 -778,9 +800,166 @@@ public class StreamSession implements I { if (isPreview()) throw new RuntimeException(String.format("[Stream #%s] Cannot receive PrepareAckMessage for preview session", planId())); - startStreamingFiles(true); + startStreamingFiles(PrepareDirection.ACK); } + /** + * In the case where we have an error checking disk space we allow the Operation to continue. + * In the case where we do _not_ have available space, this method raises a RTE. + * TODO: Consider revising this to returning a boolean and allowing callers upstream to handle that. + */ + private void checkAvailableDiskSpaceAndCompactions(Collection<StreamSummary> summaries) + { + if (DatabaseDescriptor.getSkipStreamDiskSpaceCheck()) + return; + + boolean hasAvailableSpace = true; + + try + { + hasAvailableSpace = checkAvailableDiskSpaceAndCompactions(summaries, planId(), peer.getHostAddress(true), pendingRepair != null); + } + catch (Exception e) + { + logger.error("[Stream #{}] Could not check available disk space and compactions for {}, summaries = {}", planId(), this, summaries, e); + } + if (!hasAvailableSpace) + throw new RuntimeException(String.format("Not enough disk space for stream %s), summaries=%s", this, summaries)); + } + + /** + * Makes sure that we expect to have enough disk space available for the new streams, taking into consideration + * the ongoing compactions and streams. + */ + @VisibleForTesting + public static boolean checkAvailableDiskSpaceAndCompactions(Collection<StreamSummary> summaries, + @Nullable TimeUUID planId, + @Nullable String remoteAddress, + boolean isForIncremental) + { + Map<TableId, Long> perTableIdIncomingBytes = new HashMap<>(); + Map<TableId, Integer> perTableIdIncomingFiles = new HashMap<>(); + long newStreamTotal = 0; + for (StreamSummary summary : summaries) + { + perTableIdIncomingFiles.merge(summary.tableId, summary.files, Integer::sum); + perTableIdIncomingBytes.merge(summary.tableId, summary.totalSize, Long::sum); + newStreamTotal += summary.totalSize; + } + if (perTableIdIncomingBytes.isEmpty() || newStreamTotal == 0) + return true; + + return checkDiskSpace(perTableIdIncomingBytes, planId, Directories::getFileStore) && + checkPendingCompactions(perTableIdIncomingBytes, perTableIdIncomingFiles, planId, remoteAddress, isForIncremental, newStreamTotal); + } + + @VisibleForTesting + static boolean checkDiskSpace(Map<TableId, Long> perTableIdIncomingBytes, + TimeUUID planId, + Function<File, FileStore> fileStoreMapper) + { + Map<FileStore, Long> newStreamBytesToWritePerFileStore = new HashMap<>(); + Set<FileStore> allFileStores = new HashSet<>(); + // Sum up the incoming bytes per file store - we assume that the stream is evenly distributed over the writable + // file stores for the table. + for (Map.Entry<TableId, Long> entry : perTableIdIncomingBytes.entrySet()) + { + ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(entry.getKey()); + if (cfs == null || perTableIdIncomingBytes.get(entry.getKey()) == 0) + continue; + + Set<FileStore> allWriteableFileStores = cfs.getDirectories().allFileStores(fileStoreMapper); + if (allWriteableFileStores.isEmpty()) + { + logger.error("[Stream #{}] Could not get any writeable FileStores for {}.{}", planId, cfs.keyspace.getName(), cfs.getTableName()); + continue; + } + allFileStores.addAll(allWriteableFileStores); + long totalBytesInPerFileStore = entry.getValue() / allWriteableFileStores.size(); + for (FileStore fs : allWriteableFileStores) + newStreamBytesToWritePerFileStore.merge(fs, totalBytesInPerFileStore, Long::sum); + } + Map<FileStore, Long> totalCompactionWriteRemaining = Directories.perFileStore(CompactionManager.instance.active.estimatedRemainingWriteBytes(), + fileStoreMapper); + long totalStreamRemaining = StreamManager.instance.getTotalRemainingOngoingBytes(); + long totalBytesStreamRemainingPerFileStore = totalStreamRemaining / Math.max(1, allFileStores.size()); + Map<FileStore, Long> allWriteData = new HashMap<>(); + for (Map.Entry<FileStore, Long> fsBytes : newStreamBytesToWritePerFileStore.entrySet()) + allWriteData.put(fsBytes.getKey(), fsBytes.getValue() + + totalBytesStreamRemainingPerFileStore + + totalCompactionWriteRemaining.getOrDefault(fsBytes.getKey(), 0L)); + + if (!Directories.hasDiskSpaceForCompactionsAndStreams(allWriteData)) + { + logger.error("[Stream #{}] Not enough disk space to stream {} to {} (stream ongoing remaining={}, compaction ongoing remaining={}, all ongoing writes={})", + planId, + newStreamBytesToWritePerFileStore, + perTableIdIncomingBytes.keySet().stream() + .map(ColumnFamilyStore::getIfExists).filter(Objects::nonNull) + .map(cfs -> cfs.keyspace.getName() + '.' + cfs.name) + .collect(Collectors.joining(",")), + totalStreamRemaining, + totalCompactionWriteRemaining, + allWriteData); + return false; + } + return true; + } + + @VisibleForTesting + static boolean checkPendingCompactions(Map<TableId, Long> perTableIdIncomingBytes, + Map<TableId, Integer> perTableIdIncomingFiles, + TimeUUID planId, String remoteAddress, + boolean isForIncremental, + long newStreamTotal) + { + + int pendingCompactionsBeforeStreaming = 0; + int pendingCompactionsAfterStreaming = 0; + List<String> tables = new ArrayList<>(perTableIdIncomingFiles.size()); + for (Keyspace ks : Keyspace.all()) + { + Map<ColumnFamilyStore, TableId> cfStreamed = perTableIdIncomingBytes.keySet().stream() + .filter(ks::hasColumnFamilyStore) + .collect(Collectors.toMap(ks::getColumnFamilyStore, Function.identity())); + for (ColumnFamilyStore cfs : ks.getColumnFamilyStores()) + { + CompactionStrategyManager csm = cfs.getCompactionStrategyManager(); + int tasksOther = csm.getEstimatedRemainingTasks(); + int tasksStreamed = tasksOther; + if (cfStreamed.containsKey(cfs)) + { + TableId tableId = cfStreamed.get(cfs); + tasksStreamed = csm.getEstimatedRemainingTasks(perTableIdIncomingFiles.get(tableId), + perTableIdIncomingBytes.get(tableId), + isForIncremental); + tables.add(String.format("%s.%s", cfs.keyspace.getName(), cfs.name)); + } + pendingCompactionsBeforeStreaming += tasksOther; + pendingCompactionsAfterStreaming += tasksStreamed; + } + } + Collections.sort(tables); + int pendingThreshold = ActiveRepairService.instance.getRepairPendingCompactionRejectThreshold(); + if (pendingCompactionsAfterStreaming > pendingThreshold) + { + logger.error("[Stream #{}] Rejecting incoming files based on pending compactions calculation " + + "pendingCompactionsBeforeStreaming={} pendingCompactionsAfterStreaming={} pendingThreshold={} remoteAddress={}", + planId, pendingCompactionsBeforeStreaming, pendingCompactionsAfterStreaming, pendingThreshold, remoteAddress); + return false; + } + + long newStreamFiles = perTableIdIncomingFiles.values().stream().mapToInt(i -> i).sum(); + + logger.info("[Stream #{}] Accepting incoming files newStreamTotalSSTables={} newStreamTotalBytes={} " + + "pendingCompactionsBeforeStreaming={} pendingCompactionsAfterStreaming={} pendingThreshold={} remoteAddress={} " + + "streamedTables=\"{}\"", + planId, newStreamFiles, newStreamTotal, + pendingCompactionsBeforeStreaming, pendingCompactionsAfterStreaming, pendingThreshold, remoteAddress, + String.join(",", tables)); + return true; + } + /** * Call back after sending StreamMessageHeader. * --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
