This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 8aaaf21127207818fba154d2034f352635b3f381
Merge: fe0e04c231 446a9d1d01
Author: David Capwell <[email protected]>
AuthorDate: Thu May 18 10:38:07 2023 -0700

    Merge branch 'cassandra-4.1' into trunk

 CHANGES.txt                                        |   2 +
 src/java/org/apache/cassandra/db/Directories.java  |   4 +-
 .../db/compaction/CompactionController.java        |   2 +-
 .../cassandra/db/compaction/CompactionTask.java    |  26 ++-
 .../test/CompactionOverlappingSSTableTest.java     | 117 +++++++++++
 .../org/apache/cassandra/db/DirectoriesTest.java   |   2 +-
 .../db/compaction/PartialCompactionsTest.java      | 233 +++++++++++++++++++++
 7 files changed, 374 insertions(+), 12 deletions(-)

diff --cc CHANGES.txt
index 5fdc11b760,6167e04416..f648f5fafc
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,144 -1,4 +1,146 @@@
 +5.0
 + * Fix sstable formats configuration (CASSANDRA-18441)
 + * Add guardrail to bound timestamps (CASSANDRA-18352)
 + * Add keyspace_name column to system_views.clients (CASSANDRA-18525)
 + * Moved system properties and envs to CassandraRelevantProperties and 
CassandraRelevantEnv respectively (CASSANDRA-17797)
 + * Add sstablepartitions offline tool to find large partitions in sstables 
(CASSANDRA-8720)
 + * Replace usages of json-simple dependency by Jackson (CASSANDRA-16855)
 + * When decommissioning should set Severity to limit traffic (CASSANDRA-18430)
 + * For Java11 and Java17 remove -XX:-UseBiasedLocking as it is the default 
already (CASSANDRA-17869)
 + * Upgrade commons-io to 2.11.0 (CASSANDRA-17364)
 + * Node Draining Should Abort All Current SSTables Imports (CASSANDRA-18373)
 + * Use snake case for the names of CQL native functions (CASSANDRA-18037)
 + * Use jdk-dependent checkstyle version to check the source code 
(CASSANDRA-18262)
 + * Provide summary of failed SessionInfo's in StreamResultFuture 
(CASSANDRA-17199)
 + * CEP-20: Dynamic Data Masking (CASSANDRA-17940)
 + * Add system_views.snapshots virtual table (CASSANDRA-18102)
 + * Update OpenHFT dependencies (chronicle-queue, chronicle-core, 
chronicle-bytes, chronicle-wire, chronicle-threads) (CASSANDRA-18049)
 + * Remove org.apache.cassandra.hadoop code (CASSANDRA-18323)
 + * Remove deprecated CQL functions dateOf and unixTimestampOf 
(CASSANDRA-18328)
 + * Remove DateTieredCompactionStrategy (CASSANDRA-18043)
 + * Add system_views.max_sstable_size and system_views.max_sstable_duration 
tables (CASSANDRA-18333)
 + * Extend implicit allow-filtering for virtual tables to clustering columns 
(CASSANDRA-18331)
 + * Upgrade maven-shade-plugin to 3.4.1 to fix shaded dtest JAR build 
(CASSANDRA-18136)
 + * Upgrade to Opcodes.ASM9 (CASSANDRA-17971)
 + * Add MaxSSTableSize and MaxSSTableDuration metrics and propagate them 
together with local read/write ratio to tablestats (CASSANDRA-18283)
 + * Add more logging around CompactionManager operations (CASSANDRA-18268)
 + * Reduce memory allocations of calls to ByteBufer.duplicate() made in 
org.apache.cassandra.transport.CBUtil#writeValue (CASSANDRA-18212)
 + * CEP-17: SSTable API (CASSANDRA-17056)
 + * Gossip stateMapOrdering does not have correct ordering when both 
EndpointState are in the bootstrapping set (CASSANDRA-18292)
 + * Snapshot only sstables containing mismatching ranges on preview repair 
mismatch (CASSANDRA-17561)
 + * More accurate skipping of sstables in read path (CASSANDRA-18134)
 + * Prepare for JDK17 experimental support (CASSANDRA-18179, CASSANDRA-18258)
 + * Remove Scripted UDFs internals; hooks to be added later in CASSANDRA-17281 
(CASSANDRA-18252)
 + * Update JNA to 5.13.0 (CASSANDRA-18050)
 + * Make virtual tables decide if they implicitly enable ALLOW FILTERING 
(CASSANDRA-18238)
 + * Add row, tombstone, and sstable count to nodetool profileload 
(CASSANDRA-18022)
 + * Coordinator level metrics for read response and mutation row and column 
counts (CASSANDRA-18155)
 + * Add CQL functions for dynamic data masking (CASSANDRA-17941)
 + * Print friendly error when nodetool attempts to connect to uninitialized 
server (CASSANDRA-11537)
 + * Use G1GC by default, and update default G1GC settings (CASSANDRA-18027)
 + * SimpleSeedProvider can resolve multiple IP addresses per DNS record 
(CASSANDRA-14361)
 + * Remove mocking in InternalNodeProbe spying on StorageServiceMBean 
(CASSANDRA-18152)
 + * Add compaction_properties column to system.compaction_history table and 
nodetool compactionhistory command (CASSANDRA-18061)
 + * Remove ProtocolVersion entirely from the CollectionSerializer ecosystem 
(CASSANDRA-18114)
 + * Fix serialization error in new getsstables --show-levels option 
(CASSANDRA-18140)
 + * Use checked casts when reading vints as ints (CASSANDRA-18099)
 + * Add Mutation Serialization Caching (CASSANDRA-17998)
 + * Only reload compaction strategies if disk boundaries change 
(CASSANDRA-17874)
 + * CEP-10: Simulator Java11 Support (CASSANDRA-17178)
 + * Set the major compaction type correctly for compactionstats 
(CASSANDRA-18055)
 + * Print exception message without stacktrace when nodetool commands fail on 
probe.getOwnershipWithPort() (CASSANDRA-18079)
 + * Add option to print level in nodetool getsstables output (CASSANDRA-18023)
 + * Implement a guardrail for not having zero default_time_to_live on tables 
with TWCS (CASSANDRA-18042)
 + * Add CQL scalar functions for collection aggregation (CASSANDRA-18060)
 + * Make cassandra.replayList property for CommitLogReplayer possible to react 
on keyspaces only (CASSANDRA-18044)
 + * Add Mathematical functions (CASSANDRA-17221)
 + * Make incremental backup configurable per table (CASSANDRA-15402)
 + * Change shebangs of Python scripts to resolve Python 3 from env command 
(CASSANDRA-17832)
 + * Add reasons to guardrail messages and consider guardrails in the error 
message for needed ALLOW FILTERING (CASSANDRA-17967)
 + * Add support for CQL functions on collections, tuples and UDTs 
(CASSANDRA-17811)
 + * Add flag to exclude nodes from local DC when running nodetool rebuild 
(CASSANDRA-17870)
 + * Adding endpoint verification option to client_encryption_options 
(CASSANDRA-18034)
 + * Replace 'wcwidth.py' with pypi module (CASSANDRA-17287)
 + * Add nodetool forcecompact to remove tombstoned or ttl'd data ignoring GC 
grace for given table and partition keys (CASSANDRA-17711)
 + * Offer IF (NOT) EXISTS in cqlsh completion for CREATE TYPE, DROP TYPE, 
CREATE ROLE and DROP ROLE (CASSANDRA-16640)
 + * Nodetool bootstrap resume will now return an error if the operation fails 
(CASSANDRA-16491)
 + * Disable resumable bootstrap by default (CASSANDRA-17679)
 + * Include Git SHA in --verbose flag for nodetool version (CASSANDRA-17753)
 + * Update Byteman to 4.0.20 and Jacoco to 0.8.8 (CASSANDRA-16413)
 + * Add memtable option among possible tab completions for a table 
(CASSANDRA-17982)
 + * Adds a trie-based memtable implementation (CASSANDRA-17240)
 + * Further improves precision of memtable heap tracking (CASSANDRA-17240)
 + * Fix formatting of metrics documentation (CASSANDRA-17961)
 + * Keep sstable level when streaming for decommission and move 
(CASSANDRA-17969)
 + * Add Unavailables metric for CASWrite in the docs (CASSANDRA-16357)
 + * Make Cassandra logs able to be viewed in the virtual table 
system_views.system_logs (CASSANDRA-17946)
 + * IllegalArgumentException in Gossiper#order due to concurrent mutations to 
elements being applied (CASSANDRA-17908)
 + * Include estimated active compaction remaining write size when starting a 
new compaction (CASSANDRA-17931)
 + * Mixed mode support for internode authentication during TLS upgrades 
(CASSANDRA-17923)
 + * Revert Mockito downgrade from CASSANDRA-17750 (CASSANDRA-17496)
 + * Add --older-than and --older-than-timestamp options for nodetool 
clearsnapshots (CASSANDRA-16860)
 + * Fix "open RT bound as its last item" exception (CASSANDRA-17810)
 + * Fix leak of non-standard Java types in JMX MBeans 
`org.apache.cassandra.db:type=StorageService`
 +   and `org.apache.cassandra.db:type=RepairService` as clients using JMX 
cannot handle them. More details in NEWS.txt (CASSANDRA-17668)
 + * Deprecate Throwables.propagate usage (CASSANDRA-14218)
 + * Allow disabling hotness persistence for high sstable counts 
(CASSANDRA-17868)
 + * Prevent NullPointerException when changing neverPurgeTombstones from true 
to false (CASSANDRA-17897)
 + * Add metrics around storage usage and compression (CASSANDRA-17898)
 + * Remove usage of deprecated javax certificate classes (CASSANDRA-17867)
 + * Make sure preview repairs don't optimise streams unless configured to 
(CASSANDRA-17865)
 + * Optionally avoid hint transfer during decommission (CASSANDRA-17808)
 + * Make disabling auto snapshot on selected tables possible (CASSANDRA-10383)
 + * Introduce compaction priorities to prevent upgrade compaction inability to 
finish (CASSANDRA-17851)
 + * Prevent a user from manually removing ephemeral snapshots (CASSANDRA-17757)
 + * Remove dependency on Maven Ant Tasks (CASSANDRA-17750)
 + * Update ASM(9.1 to 9.3), Mockito(1.10.10 to 1.12.13) and ByteBuddy(3.2.4 to 
4.7.0) (CASSANDRA-17835)
 + * Add the ability for operators to loosen the definition of "empty" for edge 
cases (CASSANDRA-17842)
 + * Fix potential out of range exception on column index downsampling 
(CASSANDRA-17839)
 + * Introduce target directory to vtable output for sstable_tasks and for 
compactionstats (CASSANDRA-13010)
 + * Read/Write/Truncate throw RequestFailure in a race condition with callback 
timeouts, should return Timeout instead (CASSANDRA-17828)
 + * Add ability to log load profiles at fixed intervals (CASSANDRA-17821)
 + * Protect against Gossip backing up due to a quarantined endpoint without 
version information (CASSANDRA-17830)
 + * NPE in org.apache.cassandra.cql3.Attributes.getTimeToLive (CASSANDRA-17822)
 + * Add guardrail for column size (CASSANDRA-17151)
 + * When doing a host replacement, we need to check that the node is a live 
node before failing with "Cannot replace a live node..." (CASSANDRA-17805)
 + * Add support to generate a One-Shot heap dump on unhandled exceptions 
(CASSANDRA-17795)
 + * Rate-limit new client connection auth setup to avoid overwhelming bcrypt 
(CASSANDRA-17812)
 + * DataOutputBuffer#scratchBuffer can use off-heap or on-heap memory as a 
means to control memory allocations (CASSANDRA-16471)
 + * Add ability to read the TTLs and write times of the elements of a 
collection and/or UDT (CASSANDRA-8877)
 + * Removed Python < 2.7 support from formatting.py (CASSANDRA-17694)
 + * Cleanup pylint issues with pylexotron.py (CASSANDRA-17779)
 + * NPE bug in streaming checking if SSTable is being repaired 
(CASSANDRA-17801)
 + * Users of NativeLibrary should handle lack of JNA appropriately when 
running in client mode (CASSANDRA-17794)
 + * Warn on unknown directories found in system keyspace directory rather than 
kill node during startup checks (CASSANDRA-17777)
 + * Log duplicate rows sharing a partition key found in verify and scrub 
(CASSANDRA-17789)
 + * Add separate thread pool for Secondary Index building so it doesn't block 
compactions (CASSANDRA-17781)
 + * Added JMX call to getSSTableCountPerTWCSBucket for TWCS (CASSANDRA-17774)
 + * When doing a host replacement, -Dcassandra.broadcast_interval_ms is used 
to know when to check the ring but checks that the ring wasn't changed in 
-Dcassandra.ring_delay_ms, changes to ring delay should not depend on when we 
publish load stats (CASSANDRA-17776)
 + * When bootstrap fails, CassandraRoleManager may attempt to do read queries 
that fail with "Cannot read from a bootstrapping node", and increments 
unavailables counters (CASSANDRA-17754)
 + * Add guardrail to disallow DROP KEYSPACE commands (CASSANDRA-17767)
 + * Remove ephemeral snapshot marker file and introduce a flag to 
SnapshotManifest (CASSANDRA-16911)
 + * Add a virtual table that exposes currently running queries 
(CASSANDRA-15241)
 + * Allow sstableloader to specify table without relying on path 
(CASSANDRA-16584)
 + * Fix 
TestGossipingPropertyFileSnitch.test_prefer_local_reconnect_on_listen_address 
(CASSANDRA-17700)
 + * Add ByteComparable API (CASSANDRA-6936)
 + * Add guardrail for maximum replication factor (CASSANDRA-17500)
 + * Increment CQLSH to version 6.2.0 for release 4.2 (CASSANDRA-17646)
 + * Adding support to perform certificate based internode authentication 
(CASSANDRA-17661)
 + * Option to disable CDC writes of repaired data (CASSANDRA-17666)
 + * When a node is bootstrapping it gets the whole gossip state but applies in 
random order causing some cases where StorageService will fail causing an 
instance to not show up in TokenMetadata (CASSANDRA-17676)
 + * Add CQLSH command SHOW REPLICAS (CASSANDRA-17577)
 + * Add guardrail to allow disabling of SimpleStrategy (CASSANDRA-17647)
 + * Change default directory permission to 750 in packaging (CASSANDRA-17470)
 + * Adding support for TLS client authentication for internode communication 
(CASSANDRA-17513)
 + * Add new CQL function maxWritetime (CASSANDRA-17425)
 + * Add guardrail for ALTER TABLE ADD / DROP / REMOVE column operations 
(CASSANDRA-17495)
 + * Rename DisableFlag class to EnableFlag on guardrails (CASSANDRA-17544)
++Merged from 4.0:
++ * Partial compaction can resurrect deleted data (CASSANDRA-18507)
 +
 +
  4.1.2
 + * NPE when deserializing malformed collections from client (CASSANDRA-18505)
   * Allow keystore and trustrore passwords to be nullable (CASSANDRA-18124)
   * Return snapshots with dots in their name in nodetool listsnapshots 
(CASSANDRA-18371)
   * Fix NPE when loading snapshots and data directory is one directory from 
root (CASSANDRA-18359)
diff --cc src/java/org/apache/cassandra/db/Directories.java
index 2c1cbcd778,b16dd972f1..4e7347aa26
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@@ -505,101 -488,43 +505,101 @@@ public class Directorie
          Collections.sort(candidates);
      }
  
 -    public boolean hasAvailableDiskSpace(long estimatedSSTables, long 
expectedTotalWriteSize)
 +    /**
 +     * Sums up the space required for ongoing streams + compactions + 
expected new write size per FileStore and checks
 +     * if there is enough space available.
 +     *
 +     * @param expectedNewWriteSizes where we expect to write the new 
compactions
 +     * @param totalCompactionWriteRemaining approximate amount of data 
current compactions are writing - keyed by
 +     *                                      the file store they are writing 
to (or, reading from actually, but since
 +     *                                      CASSANDRA-6696 we expect 
compactions to read and written from the same dir)
 +     * @return true if we expect to be able to write expectedNewWriteSizes to 
the available file stores
 +     */
-     public static boolean hasDiskSpaceForCompactionsAndStreams(Map<File, 
Long> expectedNewWriteSizes,
-                                                                Map<File, 
Long> totalCompactionWriteRemaining)
++    public boolean hasDiskSpaceForCompactionsAndStreams(Map<File, Long> 
expectedNewWriteSizes,
++                                                        Map<File, Long> 
totalCompactionWriteRemaining)
      {
 -        long writeSize = expectedTotalWriteSize / estimatedSSTables;
 -        long totalAvailable = 0L;
 +        return hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSizes, 
totalCompactionWriteRemaining, Directories::getFileStore);
 +    }
  
 -        for (DataDirectory dataDir : paths)
 +    @VisibleForTesting
 +    public static boolean hasDiskSpaceForCompactionsAndStreams(Map<File, 
Long> expectedNewWriteSizes,
 +                                                               Map<File, 
Long> totalCompactionWriteRemaining,
 +                                                               Function<File, 
FileStore> filestoreMapper)
 +    {
 +        Map<FileStore, Long> newWriteSizesPerFileStore = 
perFileStore(expectedNewWriteSizes, filestoreMapper);
 +        Map<FileStore, Long> compactionsRemainingPerFileStore = 
perFileStore(totalCompactionWriteRemaining, filestoreMapper);
 +
 +        Map<FileStore, Long> totalPerFileStore = new HashMap<>();
 +        for (Map.Entry<FileStore, Long> entry : 
newWriteSizesPerFileStore.entrySet())
          {
 -            if 
(DisallowedDirectories.isUnwritable(getLocationForDisk(dataDir)))
 -                  continue;
 -            DataDirectoryCandidate candidate = new 
DataDirectoryCandidate(dataDir);
 -            // exclude directory if its total writeSize does not fit to data 
directory
 -            logger.debug("DataDirectory {} has {} bytes available, checking 
if we can write {} bytes", dataDir.location, candidate.availableSpace, 
writeSize);
 -            if (candidate.availableSpace < writeSize)
 -            {
 -                logger.warn("DataDirectory {} can't be used for compaction. 
Only {} is available, but {} is the minimum write size.",
 -                            candidate.dataDirectory.location,
 -                            
FileUtils.stringifyFileSize(candidate.availableSpace),
 -                            FileUtils.stringifyFileSize(writeSize));
 -                continue;
 -            }
 -            totalAvailable += candidate.availableSpace;
 +            long addedForFilestore = entry.getValue() + 
compactionsRemainingPerFileStore.getOrDefault(entry.getKey(), 0L);
 +            totalPerFileStore.merge(entry.getKey(), addedForFilestore, 
Long::sum);
          }
 +        return hasDiskSpaceForCompactionsAndStreams(totalPerFileStore);
 +    }
  
 -        if (totalAvailable <= expectedTotalWriteSize)
 +    /**
 +     * Checks if there is enough space on all file stores to write the given 
amount of data.
 +     * The data to write should be the total amount, ongoing writes + new 
writes.
 +     */
 +    public static boolean hasDiskSpaceForCompactionsAndStreams(Map<FileStore, 
Long> totalToWrite)
 +    {
 +        boolean hasSpace = true;
 +        for (Map.Entry<FileStore, Long> toWrite : totalToWrite.entrySet())
          {
 -            StringJoiner pathString = new StringJoiner(",", "[", "]");
 -            for (DataDirectory p: paths)
 +            long availableForCompaction = 
getAvailableSpaceForCompactions(toWrite.getKey());
 +            logger.debug("FileStore {} has {} bytes available, checking if we 
can write {} bytes", toWrite.getKey(), availableForCompaction, 
toWrite.getValue());
 +            if (availableForCompaction < toWrite.getValue())
              {
 -                pathString.add(p.location.toJavaIOFile().getAbsolutePath());
 +                logger.warn("FileStore {} has only {} available, but {} is 
needed",
 +                            toWrite.getKey(),
 +                            
FileUtils.stringifyFileSize(availableForCompaction),
 +                            FileUtils.stringifyFileSize((long) 
toWrite.getValue()));
 +                hasSpace = false;
              }
 -            logger.warn("Insufficient disk space for compaction. Across {} 
there's only {} available, but {} is needed.",
 -                        pathString.toString(),
 -                        FileUtils.stringifyFileSize(totalAvailable),
 -                        FileUtils.stringifyFileSize(expectedTotalWriteSize));
 -            return false;
          }
 -        return true;
 +        return hasSpace;
 +    }
 +
 +    public static long getAvailableSpaceForCompactions(FileStore fileStore)
 +    {
 +        long availableSpace = 0;
 +        availableSpace = FileStoreUtils.tryGetSpace(fileStore, 
FileStore::getUsableSpace, e -> { throw new FSReadError(e, fileStore.name()); })
 +                         - 
DatabaseDescriptor.getMinFreeSpacePerDriveInBytes();
 +        return Math.max(0L, Math.round(availableSpace * 
DatabaseDescriptor.getMaxSpaceForCompactionsPerDrive()));
 +    }
 +
 +    public static Map<FileStore, Long> perFileStore(Map<File, Long> 
perDirectory, Function<File, FileStore> filestoreMapper)
 +    {
 +        return perDirectory.entrySet()
 +                           .stream()
 +                           .collect(Collectors.toMap(entry -> 
filestoreMapper.apply(entry.getKey()),
 +                                                     Map.Entry::getValue,
 +                                                     Long::sum));
 +    }
 +
 +    public Set<FileStore> allFileStores(Function<File, FileStore> 
filestoreMapper)
 +    {
 +        return Arrays.stream(getWriteableLocations())
 +                     .map(this::getLocationForDisk)
 +                     .map(filestoreMapper)
 +                     .collect(Collectors.toSet());
 +    }
 +
 +    /**
 +     * Gets the filestore for the actual directory where the sstables are 
stored.
 +     * Handles the fact that an operator can symlink a table directory to a 
different filestore.
 +     */
 +    public static FileStore getFileStore(File directory)
 +    {
 +        try
 +        {
 +            return Files.getFileStore(directory.toPath());
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSReadError(e, directory);
 +        }
      }
  
      public DataDirectory[] getWriteableLocations()
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 3038222849,64806319f8..1117a5df4f
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -113,13 -105,19 +113,13 @@@ public class CompactionController exten
              return;
          }
  
 -        for (SSTableReader reader : overlappingSSTables)
 -        {
 -            if (reader.isMarkedCompacted())
 -            {
 -                refreshOverlaps();
 -                return;
 -            }
 -        }
 +        if (overlappingSSTables == null || 
overlappingSSTables.stream().anyMatch(SSTableReader::isMarkedCompacted))
 +            refreshOverlaps();
      }
  
-     private void refreshOverlaps()
+     void refreshOverlaps()
      {
 -        if (NEVER_PURGE_TOMBSTONES || cfs.getNeverPurgeTombstones())
 +        if (NEVER_PURGE_TOMBSTONES_PROPERTY_VALUE || 
cfs.getNeverPurgeTombstones())
              return;
  
          if (this.overlappingSSTables != null)
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 4185200653,5fc8031966..b62bae5567
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -134,9 -131,13 +135,14 @@@ public class CompactionTask extends Abs
  
              final Set<SSTableReader> fullyExpiredSSTables = 
controller.getFullyExpiredSSTables();
  
 +            TimeUUID taskId = transaction.opId();
              // select SSTables to compact based on available disk space.
-             
buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables, taskId);
 -            if 
(!buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables))
++            if 
(!buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables, taskId))
+             {
+                 // The set of sstables has changed (one or more were excluded 
due to limited available disk space).
+                 // We need to recompute the overlaps between sstables.
+                 controller.refreshOverlaps();
+             }
  
              // sanity check: all sstables must belong to the same cfs
              assert !Iterables.any(transaction.originals(), new 
Predicate<SSTableReader>()
@@@ -355,9 -356,12 +361,11 @@@
  
      /*
       * Checks if we have enough disk space to execute the compaction.  Drops 
the largest sstable out of the Task until
 -     * there's enough space (in theory) to handle the compaction.  Does not 
take into account space that will be taken by
 -     * other compactions.
 +     * there's enough space (in theory) to handle the compaction.
+      *
+      * @return true if there is enough disk space to execute the complete 
compaction, false if some sstables are excluded.
       */
-     protected void buildCompactionCandidatesForAvailableDiskSpace(final 
Set<SSTableReader> fullyExpiredSSTables, TimeUUID taskId)
 -    protected boolean buildCompactionCandidatesForAvailableDiskSpace(final 
Set<SSTableReader> fullyExpiredSSTables)
++    protected boolean buildCompactionCandidatesForAvailableDiskSpace(final 
Set<SSTableReader> fullyExpiredSSTables, TimeUUID taskId)
      {
          if(!cfs.isCompactionDiskSpaceCheckEnabled() && compactionType == 
OperationType.COMPACTION)
          {
@@@ -372,29 -376,13 +380,29 @@@
          while(!nonExpiredSSTables.isEmpty())
          {
              // Only consider write size of non expired SSTables
 -            long expectedWriteSize = 
cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
 -            long estimatedSSTables = Math.max(1, expectedWriteSize / 
strategy.getMaxSSTableBytes());
 +            long writeSize;
 +            try
 +            {
 +                writeSize = 
cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
 +                Map<File, Long> expectedNewWriteSize = new HashMap<>();
 +                List<File> newCompactionDatadirs = 
cfs.getDirectoriesForFiles(nonExpiredSSTables);
 +                long writeSizePerOutputDatadir = writeSize / 
Math.max(newCompactionDatadirs.size(), 1);
 +                for (File directory : newCompactionDatadirs)
 +                    expectedNewWriteSize.put(directory, 
writeSizePerOutputDatadir);
 +
 +                Map<File, Long> expectedWriteSize = 
CompactionManager.instance.active.estimatedRemainingWriteBytes();
  
 -            if(cfs.getDirectories().hasAvailableDiskSpace(estimatedSSTables, 
expectedWriteSize))
 +                // todo: abort streams if they block compactions
-                 if 
(Directories.hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSize, 
expectedWriteSize))
++                if 
(cfs.getDirectories().hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSize,
 expectedWriteSize))
 +                    break;
 +            }
 +            catch (Exception e)
 +            {
 +                logger.error("Could not check if there is enough disk space 
for compaction {}", taskId, e);
                  break;
 +            }
  
 -            if (!reduceScopeForLimitedSpace(nonExpiredSSTables, 
expectedWriteSize))
 +            if (!reduceScopeForLimitedSpace(nonExpiredSSTables, writeSize))
              {
                  // we end up here if we can't take any more sstables out of 
the compaction.
                  // usually means we've run out of disk space
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/CompactionOverlappingSSTableTest.java
index 0000000000,6a65c91437..44caf85ff6
mode 000000,100644..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/CompactionOverlappingSSTableTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/CompactionOverlappingSSTableTest.java
@@@ -1,0 -1,115 +1,117 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
++import java.io.File;
+ import java.io.IOException;
+ import java.util.Arrays;
++import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.TimeoutException;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.stream.Collectors;
+ 
+ import com.google.common.collect.Iterables;
+ import com.google.common.collect.Sets;
+ import com.google.common.util.concurrent.Uninterruptibles;
+ import org.junit.Test;
+ 
+ import net.bytebuddy.ByteBuddy;
+ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+ import net.bytebuddy.implementation.MethodDelegation;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Directories;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.lifecycle.SSTableSet;
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
+ 
+ import static net.bytebuddy.matcher.ElementMatchers.named;
+ import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+ import static org.junit.Assert.assertEquals;
+ 
+ public class CompactionOverlappingSSTableTest extends TestBaseImpl
+ {
+     @Test
+     public void partialCompactionOverlappingTest() throws IOException, 
TimeoutException
+     {
+ 
+         try (Cluster cluster = init(builder().withNodes(1)
+                                              .withDataDirCount(1)
+                                              
.withInstanceInitializer(BB::install)
+                                              .start()))
+         {
+             cluster.schemaChange(withKeyspace("alter keyspace %s with 
replication = {'class': 'SimpleStrategy', 'replication_factor':3}"));
+             cluster.schemaChange(withKeyspace("create table %s.tbl (id int 
primary key) with compaction = {'class':'SizeTieredCompactionStrategy', 
'enabled': 'false'} AND gc_grace_seconds=0"));
+             Set<Integer> expected = Sets.newHashSetWithExpectedSize(990);
+             for (int i = 0; i < 1000; i++)
+             {
+                 cluster.coordinator(1).execute(withKeyspace("insert into 
%s.tbl (id) values (?)"), ConsistencyLevel.ONE, i);
+                 if (i >= 10)
+                     expected.add(i);
+             }
+             cluster.get(1).flush(KEYSPACE);
+             for (int i = 0; i < 10; i++)
+             {
+                 cluster.coordinator(1).execute(withKeyspace("delete from 
%s.tbl where id = ?"), ConsistencyLevel.ONE, i);
+                 cluster.get(1).flush(KEYSPACE);
+             }
+             assertEquals(expected, 
Arrays.stream(cluster.coordinator(1).execute(withKeyspace("select * from 
%s.tbl"), ConsistencyLevel.ONE))
+                                          .map(x -> x[0])
+                                          .collect(Collectors.toSet()));
+ 
+             Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); // 
make sure tombstones are gc:able
+ 
+             cluster.get(1).runOnInstance(() -> {
+                 BB.enabled.set(true);
+                 ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+                 cfs.forceMajorCompaction();
+                 assertEquals("We should have 2 sstables (not 1) after major 
compaction since we reduced the scope of the compaction",
+                              2, 
Iterables.size(cfs.getSSTables(SSTableSet.CANONICAL)));
+             });
+             assertEquals(expected, 
Arrays.stream(cluster.coordinator(1).execute(withKeyspace("select * from 
%s.tbl"), ConsistencyLevel.ONE))
+                                          .map(x -> x[0])
+                                          .collect(Collectors.toSet()));
+         }
+     }
+ 
+     public static class BB
+     {
+         static AtomicBoolean enabled = new AtomicBoolean();
+         public static void install(ClassLoader cl, Integer i)
+         {
+             new ByteBuddy().rebase(Directories.class)
 -                           
.method(named("hasAvailableDiskSpace").and(takesArguments(2)))
++                           
.method(named("hasDiskSpaceForCompactionsAndStreams").and(takesArguments(2)))
+                            .intercept(MethodDelegation.to(BB.class))
+                            .make()
+                            .load(cl, ClassLoadingStrategy.Default.INJECTION);
+         }
+ 
 -        public static boolean hasAvailableDiskSpace(long ignore1, long 
ignore2)
++        public static boolean 
hasDiskSpaceForCompactionsAndStreams(Map<File,Long> ignore1, Map<File,Long> 
ignore2)
+         {
+             if (enabled.get())
+             {
+                 enabled.set(false);
+                 return false;
+             }
+             return true;
+         }
+     }
+ }
diff --cc test/unit/org/apache/cassandra/db/DirectoriesTest.java
index 6954fe248a,c701516d1e..71a49cf4f1
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@@ -971,182 -972,89 +971,182 @@@ public class DirectoriesTes
      }
  
      @Test
 -    public void testHasAvailableDiskSpace()
 +    public void testHasAvailableSpace()
      {
 -        DataDirectory[] dataDirectories = new DataDirectory[]
 -                                          {
 -                                          new DataDirectory(new 
File("/nearlyFullDir1"))
 -                                          {
 -                                              public long getAvailableSpace()
 -                                              {
 -                                                  return 11L;
 -                                              }
 -                                          },
 -                                          new DataDirectory(new 
File("/uniformDir2"))
 -                                          {
 -                                              public long getAvailableSpace()
 -                                              {
 -                                                  return 999L;
 -                                              }
 -                                          },
 -                                          new DataDirectory(new 
File("/veryFullDir"))
 -                                          {
 -                                              public long getAvailableSpace()
 -                                              {
 -                                                  return 4L;
 -                                              }
 -                                          }
 -                                          };
 +        double oldMaxSpaceForCompactions = 
DatabaseDescriptor.getMaxSpaceForCompactionsPerDrive();
 +        long oldFreeSpace = 
DatabaseDescriptor.getMinFreeSpacePerDriveInMebibytes();
 +        DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(1.0);
 +        DatabaseDescriptor.setMinFreeSpacePerDriveInMebibytes(0);
 +        try
 +        {
 +            FakeFileStore fs1 = new FakeFileStore();
 +            FakeFileStore fs2 = new FakeFileStore();
 +            FakeFileStore fs3 = new FakeFileStore();
 +            Map<FileStore, Long> writes = new HashMap<>();
 +
 +            fs1.usableSpace = 30;
 +            fs2.usableSpace = 30;
 +            fs3.usableSpace = 30;
 +
 +            writes.put(fs1, 20L);
 +            writes.put(fs2, 20L);
 +            writes.put(fs3, 20L);
 +            
assertTrue(Directories.hasDiskSpaceForCompactionsAndStreams(writes));
 +
 +            fs1.usableSpace = 19;
 +            
assertFalse(Directories.hasDiskSpaceForCompactionsAndStreams(writes));
 +
 +            writes.put(fs2, 25L*1024*1024+9);
 +            fs2.usableSpace = 20L*1024*1024-9;
 +            writes.put(fs3, 999L*1024*1024*1024+9);
 +            fs2.usableSpace = 20L*1024+99;
 +            
assertFalse(Directories.hasDiskSpaceForCompactionsAndStreams(writes));
 +
 +            fs1.usableSpace = 30;
 +            fs2.usableSpace = 30;
 +            fs3.usableSpace = 30L*1024*1024*1024*1024;
 +
 +            writes.put(fs1, 20L);
 +            writes.put(fs2, 20L);
 +            writes.put(fs3, 30L*1024*1024*1024*1024+1);
 +            
assertFalse(Directories.hasDiskSpaceForCompactionsAndStreams(writes));
 +
 +            List<ILoggingEvent> filteredLog = 
filterLogByDiyId(listAppender.list);
 +            // Log messages can be out of order, even for the single thread. 
(e tui AsyncAppender?)
 +            // We can deal with it, it's sufficient to just check that all 
messages exist in the result
 +            assertEquals(17, filteredLog.size());
 +
 +            String logMsg = "30 bytes available, checking if we can write 20 
bytes";
 +            checkFormattedMessage(filteredLog, Level.DEBUG, logMsg, 7);
 +            logMsg = "19 bytes available, checking if we can write 20 bytes";
 +            checkFormattedMessage(filteredLog, Level.DEBUG, logMsg, 2);
 +
 +
 +            logMsg = "19 bytes available, but 20 bytes is needed";
 +            checkFormattedMessage(filteredLog, Level.WARN, logMsg, 2);
 +            logMsg = "has only 20.1 KiB available, but 25 MiB is needed";
 +            checkFormattedMessage(filteredLog, Level.WARN, logMsg, 1);
 +            logMsg = "has only 30 bytes available, but 999 GiB is needed";
 +            checkFormattedMessage(filteredLog, Level.WARN, logMsg, 1);
 +            logMsg = "has only 30 TiB available, but 30 TiB is needed";
 +            checkFormattedMessage(filteredLog, Level.WARN, logMsg, 1);
 +        }
 +        finally
 +        {
 +            
DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(oldMaxSpaceForCompactions);
 +            
DatabaseDescriptor.setMinFreeSpacePerDriveInMebibytes(oldFreeSpace);
 +        }
 +    }
 +
 +    @Test
 +    public void testHasAvailableSpaceSumming()
 +    {
 +        double oldMaxSpaceForCompactions = 
DatabaseDescriptor.getMaxSpaceForCompactionsPerDrive();
 +        long oldFreeSpace = 
DatabaseDescriptor.getMinFreeSpacePerDriveInMebibytes();
 +        DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(1.0);
 +        DatabaseDescriptor.setMinFreeSpacePerDriveInMebibytes(0);
 +        try
 +        {
 +            FakeFileStore fs1 = new FakeFileStore();
 +            FakeFileStore fs2 = new FakeFileStore();
 +            Map<File, Long> expectedNewWriteSizes = new HashMap<>();
 +            Map<File, Long> totalCompactionWriteRemaining = new HashMap<>();
 +
 +            fs1.usableSpace = 100;
 +            fs2.usableSpace = 100;
 +
 +            File f1 = new File("f1");
 +            File f2 = new File("f2");
 +            File f3 = new File("f3");
 +
 +            expectedNewWriteSizes.put(f1, 20L);
 +            expectedNewWriteSizes.put(f2, 20L);
 +            expectedNewWriteSizes.put(f3, 20L);
 +
 +            totalCompactionWriteRemaining.put(f1, 20L);
 +            totalCompactionWriteRemaining.put(f2, 20L);
 +            totalCompactionWriteRemaining.put(f3, 20L);
 +            Function<File, FileStore> filestoreMapper = (f) -> {
 +                if (f == f1 || f == f2)
 +                    return fs1;
 +                return fs2;
 +            };
 +            
assertTrue(Directories.hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSizes,
 totalCompactionWriteRemaining, filestoreMapper));
 +            fs1.usableSpace = 79;
 +            
assertFalse(Directories.hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSizes,
 totalCompactionWriteRemaining, filestoreMapper));
 +            fs1.usableSpace = 81;
 +            
assertTrue(Directories.hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSizes,
 totalCompactionWriteRemaining, filestoreMapper));
 +
 +            expectedNewWriteSizes.clear();
 +            expectedNewWriteSizes.put(f1, 100L);
 +            totalCompactionWriteRemaining.clear();
 +            totalCompactionWriteRemaining.put(f2, 100L);
 +            fs1.usableSpace = 150;
 +
 +            
assertFalse(Directories.hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSizes,
 totalCompactionWriteRemaining, filestoreMapper));
 +            expectedNewWriteSizes.clear();
 +            expectedNewWriteSizes.put(f1, 100L);
 +            totalCompactionWriteRemaining.clear();
 +            totalCompactionWriteRemaining.put(f3, 500L);
 +            fs1.usableSpace = 150;
 +            fs2.usableSpace = 400; // too little space for the ongoing 
compaction, but this filestore does not affect the new compaction so it should 
be allowed
 +
 +            
assertTrue(Directories.hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSizes,
 totalCompactionWriteRemaining, filestoreMapper));
 +        }
 +        finally
 +        {
 +            
DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(oldMaxSpaceForCompactions);
 +            
DatabaseDescriptor.setMinFreeSpacePerDriveInMebibytes(oldFreeSpace / 
FileUtils.ONE_MIB);
 +        }
 +    }
  
 -        Directories d = new Directories( ((TableMetadata) CFM.toArray()[0]), 
dataDirectories);
 -
 -        assertTrue(d.hasAvailableDiskSpace(1,2));
 -        assertTrue(d.hasAvailableDiskSpace(10,99));
 -        assertFalse(d.hasAvailableDiskSpace(10,1024));
 -        assertFalse(d.hasAvailableDiskSpace(1024,1024*1024));
 -
 -        List<ILoggingEvent> filteredLog = listAppender.list;
 -        //List<ILoggingEvent> filteredLog = 
filterLogByDiyId(listAppender.list);
 -        // Log messages can be out of order, even for the single thread. (e 
tui AsyncAppender?)
 -        // We can deal with it, it's sufficient to just check that all 
messages exist in the result
 -        assertEquals(23, filteredLog.size());
 -        String logMsgFormat = "DataDirectory %s has %d bytes available, 
checking if we can write %d bytes";
 -        String logMsg = String.format(logMsgFormat, "/nearlyFullDir1", 11, 2);
 -        checkFormattedMessage(filteredLog, Level.DEBUG, logMsg, 1);
 -        logMsg = String.format(logMsgFormat, "/uniformDir2", 999, 2);
 -        checkFormattedMessage(filteredLog, Level.DEBUG, logMsg, 1);
 -        logMsg = String.format(logMsgFormat, "/veryFullDir", 4, 2);
 -        checkFormattedMessage(filteredLog, Level.DEBUG, logMsg, 1);
 -        logMsg = String.format(logMsgFormat, "/nearlyFullDir1", 11, 2);
 -        checkFormattedMessage(filteredLog, Level.DEBUG, logMsg, 1);
 -        logMsg = String.format(logMsgFormat, "/uniformDir2", 999, 9);
 -        checkFormattedMessage(filteredLog, Level.DEBUG, logMsg, 1);
 -        logMsg = String.format(logMsgFormat, "/veryFullDir", 4, 9);
 -        checkFormattedMessage(filteredLog, Level.DEBUG, logMsg, 1);
 -        logMsg = String.format(logMsgFormat, "/nearlyFullDir1", 11, 102);
 -        checkFormattedMessage(filteredLog, Level.DEBUG, logMsg, 1);
 -        logMsg = String.format(logMsgFormat, "/uniformDir2", 999, 102);
 -        checkFormattedMessage(filteredLog, Level.DEBUG, logMsg, 1);
 -        logMsg = String.format(logMsgFormat, "/veryFullDir", 4, 102);
 -        checkFormattedMessage(filteredLog, Level.DEBUG, logMsg, 1);
 -        logMsg = String.format(logMsgFormat, "/nearlyFullDir1", 11, 1024);
 -        checkFormattedMessage(filteredLog, Level.DEBUG, logMsg, 1);
 -        logMsg = String.format(logMsgFormat, "/uniformDir2", 999, 1024);
 -        checkFormattedMessage(filteredLog, Level.DEBUG, logMsg, 1);
 -        logMsg = String.format(logMsgFormat, "/veryFullDir", 4, 1024);
 -        checkFormattedMessage(filteredLog, Level.DEBUG, logMsg, 1);
 -
 -        logMsgFormat = "DataDirectory %s can't be used for compaction. Only 
%s is available, but %s is the minimum write size.";
 -        logMsg = String.format(logMsgFormat, "/veryFullDir", "4 bytes", "9 
bytes");
 -        checkFormattedMessage(filteredLog, Level.WARN, logMsg, 1);
 -        logMsg = String.format(logMsgFormat, "/nearlyFullDir1", "11 bytes", 
"102 bytes");
 -        checkFormattedMessage(filteredLog, Level.WARN, logMsg, 1);
 -        logMsg = String.format(logMsgFormat, "/veryFullDir", "4 bytes", "102 
bytes");
 -        checkFormattedMessage(filteredLog, Level.WARN, logMsg, 1);
 -        logMsg = String.format(logMsgFormat, "/nearlyFullDir1", "11 bytes", 
"1 KiB");
 -        checkFormattedMessage(filteredLog, Level.WARN, logMsg, 1);
 -        logMsg = String.format(logMsgFormat, "/uniformDir2", "999 bytes", "1 
KiB");
 -        checkFormattedMessage(filteredLog, Level.WARN, logMsg, 1);
 -        logMsg = String.format(logMsgFormat, "/veryFullDir", "4 bytes", "1 
KiB");
 -        checkFormattedMessage(filteredLog, Level.WARN, logMsg, 1);
 -
 -        logMsgFormat = "Across %s there's only %s available, but %s is 
needed.";
 -        logMsg = String.format(logMsgFormat, 
"[/nearlyFullDir1,/uniformDir2,/veryFullDir]", "999 bytes", "1 KiB");
 -        checkFormattedMessage(filteredLog, Level.WARN, logMsg, 1);
 -        logMsg = String.format(logMsgFormat, 
"[/nearlyFullDir1,/uniformDir2,/veryFullDir]", "0 bytes", "1 MiB");
 -        checkFormattedMessage(filteredLog, Level.WARN, logMsg, 1);
 +    private String getNewFilename(TableMetadata tm, boolean oldStyle)
 +    {
 +        return tm.keyspace + File.pathSeparator() + tm.name + (oldStyle ? "" 
: Component.separator + tm.id.toHexString()) + "/na-1-big-Data.db";
 +    }
 +
 +    private List<Directories.DataDirectoryCandidate> 
getWriteableDirectories(DataDirectory[] dataDirectories, long writeSize)
 +    {
 +        // copied from Directories.getWriteableLocation(long)
 +        List<Directories.DataDirectoryCandidate> candidates = new 
ArrayList<>();
 +
 +        long totalAvailable = 0L;
 +
 +        for (DataDirectory dataDir : dataDirectories)
 +        {
 +            Directories.DataDirectoryCandidate candidate = new 
Directories.DataDirectoryCandidate(dataDir);
 +            // exclude directory if its total writeSize does not fit to data 
directory
 +            if (candidate.availableSpace < writeSize)
 +                continue;
 +            candidates.add(candidate);
 +            totalAvailable += candidate.availableSpace;
 +        }
 +
 +        Directories.sortWriteableCandidates(candidates, totalAvailable);
 +
 +        return candidates;
 +    }
 +
-     private static class FakeFileStore extends FileStore
++    public static class FakeFileStore extends FileStore
 +    {
 +        public long usableSpace = 100;
 +        public long getUsableSpace()
 +        {
 +            return usableSpace;
 +        }
 +        public String name() {return null;}
 +        public String type() {return null;}
 +        public boolean isReadOnly() {return false;}
 +        public long getTotalSpace() {return 0;}
 +        public long getUnallocatedSpace() {return 0;}
 +        public boolean supportsFileAttributeView(Class<? extends 
FileAttributeView> type) {return false;}
 +        public boolean supportsFileAttributeView(String name) {return false;}
 +        public <V extends FileStoreAttributeView> V 
getFileStoreAttributeView(Class<V> type) {return null;}
 +        public Object getAttribute(String attribute) {return null;}
 +
 +        public String toString()
 +        {
 +            return "MockFileStore";
 +        }
      }
  }
diff --cc 
test/unit/org/apache/cassandra/db/compaction/PartialCompactionsTest.java
index 0000000000,b922ca8057..5e0ed66dbf
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/compaction/PartialCompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/PartialCompactionsTest.java
@@@ -1,0 -1,207 +1,233 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.db.compaction;
+ 
+ import java.util.Iterator;
++import java.util.Map;
+ 
+ import org.junit.After;
+ import org.junit.Before;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.Util;
++import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Directories;
++import org.apache.cassandra.db.DirectoriesTest;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
++import org.apache.cassandra.io.util.File;
+ import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.schema.TableMetadataRef;
+ import org.apache.cassandra.utils.CloseableIterator;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
+ import static org.hamcrest.MatcherAssert.assertThat;
+ import static org.hamcrest.CoreMatchers.instanceOf;
+ import static org.junit.Assert.assertEquals;
+ 
+ public class PartialCompactionsTest extends SchemaLoader
+ {
+     static final String KEYSPACE = 
PartialCompactionsTest.class.getSimpleName();
+     static final String TABLE = "testtable";
+ 
+     @BeforeClass
+     public static void initSchema()
+     {
+         CompactionManager.instance.disableAutoCompaction();
+ 
+         SchemaLoader.createKeyspace(KEYSPACE,
+                                     KeyspaceParams.simple(1),
+                                     SchemaLoader.standardCFMD(KEYSPACE, 
TABLE));
+ 
+         LimitableDataDirectory.applyTo(KEYSPACE, TABLE);
+     }
+ 
+     @Before
+     public void prepareCFS()
+     {
+         LimitableDataDirectory.setAvailableSpace(cfStore(), null);
+     }
+ 
+     @After
+     public void truncateCF()
+     {
+         cfStore().truncateBlocking();
+         LifecycleTransaction.waitForDeletions();
+     }
+ 
+     private static ColumnFamilyStore cfStore()
+     {
+         return Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
+     }
+ 
+     @Test
+     public void 
shouldNotResurrectDataFromSSTableExcludedDueToInsufficientSpace()
+     {
+         // given
+         ColumnFamilyStore cfs = cfStore();
+         int few = 10, many = 10 * few;
+ 
+         // a large sstable as the oldest
+         createDataSSTable(cfs, 0, many);
+         // more inserts (to have more than one sstable to compact)
+         createDataSSTable(cfs, many, many + few);
+         // delete data that's in both of the prior sstables
+         createTombstonesSSTable(cfs, many - few / 2, many + few / 2);
+ 
+         // emulate there not being enough space to compact all sstables
+         LimitableDataDirectory.setAvailableSpace(cfs, 
enoughSpaceForAllButTheLargestSSTable(cfs));
+ 
+         // when - run a compaction where all tombstones have timed out
+         
FBUtilities.waitOnFutures(CompactionManager.instance.submitMaximal(cfs, 
Integer.MAX_VALUE, false));
+ 
+         // then - the tombstones should not be removed
+         assertEquals("live sstables after compaction", 2, 
cfs.getLiveSSTables().size());
+         assertEquals("remaining live rows after compaction", many, 
liveRows(cfs));
+     }
+ 
+     private static long 
enoughSpaceForAllButTheLargestSSTable(ColumnFamilyStore cfs)
+     {
+         long totalSize = 1, maxSize = 0;
+         for (SSTableReader ssTable : cfs.getLiveSSTables())
+         {
+             long size = ssTable.onDiskLength();
+             if (size > maxSize) maxSize = size;
+             totalSize += size;
+         }
+         return totalSize - maxSize;
+     }
+ 
+     private static int liveRows(ColumnFamilyStore cfs)
+     {
+         return Util.getAll(Util.cmd(cfs, "key1").build()).stream()
+                    .map(partition -> count(partition.rowIterator()))
+                    .reduce(Integer::sum)
+                    .orElse(0);
+     }
+ 
+     private static int count(Iterator<?> iter)
+     {
+         try (CloseableIterator<?> unused = iter instanceof CloseableIterator 
? (CloseableIterator<?>) iter : null)
+         {
+             int count = 0;
+             for (; iter.hasNext(); iter.next())
+             {
+                 count++;
+             }
+             return count;
+         }
+     }
+ 
+     private static void createDataSSTable(ColumnFamilyStore cfs, int 
firstKey, int endKey)
+     {
+         for (int i = firstKey; i < endKey; i++)
+         {
+             new RowUpdateBuilder(cfs.metadata(), 0, "key1")
+             .clustering(String.valueOf(i))
+             .add("val", String.valueOf(i))
+             .build()
+             .applyUnsafe();
+         }
+         cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
+     }
+ 
+     private static void createTombstonesSSTable(ColumnFamilyStore cfs, int 
firstKey, int endKey)
+     {
+         for (int i = firstKey; i < endKey; i++)
+         {
+             RowUpdateBuilder.deleteRow(cfs.metadata(), 1, "key1", 
String.valueOf(i)).applyUnsafe();
+         }
+         cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
+     }
+ 
+     private static class LimitableDataDirectory extends 
Directories.DataDirectory
+     {
+         private Long availableSpace;
+ 
+         LimitableDataDirectory(Directories.DataDirectory dataDirectory)
+         {
+             super(dataDirectory.location);
+         }
+ 
+         @Override
+         public long getAvailableSpace()
+         {
+             if (availableSpace != null)
+                 return availableSpace;
+             return super.getAvailableSpace();
+         }
+ 
+         public static void setAvailableSpace(ColumnFamilyStore cfs, Long 
availableSpace)
+         {
+             for (Directories.DataDirectory location : 
cfs.getDirectories().getWriteableLocations())
+             {
+                 assertThat("ColumnFamilyStore set up with ability to emulate 
limited disk space",
+                            location, 
instanceOf(LimitableDataDirectory.class));
+                 ((LimitableDataDirectory) location).availableSpace = 
availableSpace;
+             }
+         }
+ 
+         public static void applyTo(String ks, String cf)
+         {
+             Keyspace keyspace = Keyspace.open(ks);
+             ColumnFamilyStore store = keyspace.getColumnFamilyStore(cf);
+             TableMetadataRef metadata = store.metadata;
+             keyspace.dropCf(metadata.id, true);
+             ColumnFamilyStore cfs = 
ColumnFamilyStore.createColumnFamilyStore(keyspace, cf, metadata, 
wrapDirectoriesOf(store), false, false, true);
+             keyspace.initCfCustom(cfs);
+         }
+ 
+         private static Directories wrapDirectoriesOf(ColumnFamilyStore cfs)
+         {
+             Directories.DataDirectory[] original = 
cfs.getDirectories().getWriteableLocations();
+             Directories.DataDirectory[] wrapped = new 
Directories.DataDirectory[original.length];
+             for (int i = 0; i < wrapped.length; i++)
+             {
+                 wrapped[i] = new LimitableDataDirectory(original[i]);
+             }
 -            return new Directories(cfs.metadata(), wrapped);
++            return new Directories(cfs.metadata(), wrapped)
++            {
++                @Override
++                public boolean hasDiskSpaceForCompactionsAndStreams(Map<File, 
Long> expectedNewWriteSizes, Map<File, Long> totalCompactionWriteRemaining)
++                {
++                    return 
hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSizes, 
totalCompactionWriteRemaining, file -> {
++                        for (DataDirectory location : getWriteableLocations())
++                        {
++                            if 
(file.toPath().startsWith(location.location.toPath())) {
++                                LimitableDataDirectory directory = 
(LimitableDataDirectory) location;
++                                if (directory.availableSpace != null)
++                                {
++                                    DirectoriesTest.FakeFileStore store = new 
DirectoriesTest.FakeFileStore();
++                                    // reverse the computation in 
Directories.getAvailableSpaceForCompactions
++                                    store.usableSpace = 
Math.round(directory.availableSpace / 
DatabaseDescriptor.getMaxSpaceForCompactionsPerDrive()) + 
DatabaseDescriptor.getMinFreeSpacePerDriveInBytes();
++                                    return store;
++                                }
++                            }
++                        }
++                        return Directories.getFileStore(file);
++                    });
++                }
++            };
+         }
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to