This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 21382e6874 Fix CompressionDictionary being closed while still in use
21382e6874 is described below

commit 21382e68744eaa64881f9e3953c21cecf7aea199
Author: Yifan Cai <[email protected]>
AuthorDate: Wed Dec 3 22:12:55 2025 -0800

    Fix CompressionDictionary being closed while still in use
    
    Patch by Yifan Cai; Reviewed by Stefan Miklosovic for CASSANDRA-21047
---
 CHANGES.txt                                        |   1 +
 .../db/compression/CompressionDictionary.java      |  82 +++++++++-
 .../db/compression/CompressionDictionaryCache.java |   8 +-
 .../db/compression/ZstdCompressionDictionary.java  | 100 ++++++++----
 .../cassandra/io/compress/CompressionMetadata.java |  77 ++++++++-
 .../io/compress/ZstdDictionaryCompressor.java      |  11 +-
 .../cassandra/utils/concurrent/RefCounted.java     |  24 ++-
 .../CompressionDictionaryCacheTest.java            |   4 +-
 .../compression/ZstdCompressionDictionaryTest.java |  62 +++++--
 .../io/compress/CompressionMetadataTest.java       | 179 +++++++++++++++++++++
 10 files changed, 480 insertions(+), 68 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 6cd56b9ed2..c699a8f26b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Fix CompressionDictionary being closed while still in use (CASSANDRA-21047)
  * When updating a multi cell collection element, if the update is rejected 
then the shared Row.Builder is not freed causing all future mutations to be 
rejected (CASSANDRA-21055)
  * Schema annotations escape validation on CREATE and ALTER DDL statements 
(CASSANDRA-21046)
  * Calculate once and cache the result of ModificationStatement#requiresRead 
as a perf optimization (CASSANDRA-21040)
diff --git 
a/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java 
b/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
index b3725cc815..fff6255c3f 100644
--- a/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
+++ b/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.Objects;
 import javax.annotation.Nullable;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
@@ -32,8 +33,41 @@ import com.google.common.hash.Hashing;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.compress.ZstdDictionaryCompressor;
+import org.apache.cassandra.utils.concurrent.Ref;
 
-public interface CompressionDictionary extends AutoCloseable
+/**
+ * Interface for compression dictionaries with reference-counted lifecycle 
management.
+ *
+ * <h2>Reference Counting Model</h2>
+ * Compression dictionaries hold native resources that must be explicitly 
managed. This interface
+ * uses {@link Ref} for safe lifecycle management across multiple concurrent 
users.
+ *
+ * <h3>Ownership and Usage in Cassandra</h3>
+ * <ul>
+ *   <li><b>CompressionDictionaryManager</b>: Holds the primary reference 
({@link #selfRef()}) for cached dictionaries</li>
+ *   <li><b>CompressionMetadata.Writer</b>: Acquires a reference during 
SSTable write, held for the writer's lifetime</li>
+ *   <li><b>CompressionMetadata</b>: Acquires a reference when created (via 
{@link #tryRef()}), held for the SSTable reader's lifetime.
+ *       All copies created via sharedCopy() share this single reference 
through WrappedSharedCloseable</li>
+ * </ul>
+ *
+ * <h3>Correctness Guarantee</h3>
+ * The reference counting prevents premature cleanup of native resources:
+ * <ol>
+ *   <li>CompressionMetadata acquires a reference when an SSTable is 
opened</li>
+ *   <li>Native resources remain valid as long as any reference exists 
(refcount &gt; 0)</li>
+ *   <li>Even if the cache evicts the dictionary, the SSTable's reference 
keeps resources alive</li>
+ *   <li>Cleanup runs exactly once when the last reference is released 
(refcount goes 0 → -1)</li>
+ *   <li>After cleanup, {@link #tryRef()} returns null, preventing new 
references to released resources</li>
+ * </ol>
+ *
+ * This ensures dictionaries cannot be freed while SSTables are using them for 
compression/decompression,
+ * even when the cache evicts the dictionary concurrently.
+ *
+ * @see Ref for reference counting implementation
+ * @see CompressionDictionaryManager for cache management
+ * @see org.apache.cassandra.io.compress.CompressionMetadata for SSTable usage
+ */
+public interface CompressionDictionary
 {
     /**
      * Get the dictionary id
@@ -75,6 +109,48 @@ public interface CompressionDictionary extends AutoCloseable
         return dictId().kind;
     }
 
+    /**
+     * Try to acquire a new reference to this dictionary.
+     * Returns null if the dictionary is already released.
+     * <p>
+     * The caller must ensure the returned reference is released when no 
longer needed,
+     * either by calling {@code ref.release()} or {@code ref.close()} (they 
are equivalent).
+     * Failing to release the reference will prevent cleanup of native 
resources and cause
+     * a memory leak.
+     *
+     * @return a new reference to this dictionary, or null if already released
+     */
+    Ref<? extends CompressionDictionary> tryRef();
+
+    /**
+     * Get the self-reference of this dictionary.
+     * This is used to release the primary reference held by the cache.
+     *
+     * @return the self-reference
+     */
+    Ref<? extends CompressionDictionary> selfRef();
+
+    /**
+     * Releases the self-reference of this dictionary.
+     * This is a convenience method equivalent to calling {@code 
selfRef().close()}.
+     * <p>
+     * This method is idempotent - calling it multiple times is safe and will 
only
+     * release the self-reference once. Subsequent calls have no effect.
+     * <p>
+     * This method is typically used when creating a dictionary outside the 
cache
+     * (e.g., in tests or temporary usage) and needing to clean it up. For 
dictionaries
+     * managed by the cache, the cache's removal listener handles cleanup via
+     * {@code selfRef().release()}.
+     *
+     * @see #selfRef()
+     * @see #tryRef()
+     */
+    @VisibleForTesting
+    default void close()
+    {
+        selfRef().close();
+    }
+
     /**
      * Write compression dictionary to file
      *
@@ -192,7 +268,7 @@ public interface CompressionDictionary extends AutoCloseable
             if (dict.length != storedLength)
             {
                 throw new IllegalStateException(String.format("Dictionary 
length mismatch for %s dict id %d. Expected: %d, actual: %d",
-                                                               kindStr, 
dictId, storedLength, dict.length));
+                                                              kindStr, dictId, 
storedLength, dict.length));
             }
 
             // Validate checksum
@@ -200,7 +276,7 @@ public interface CompressionDictionary extends AutoCloseable
             if (calculatedChecksum != storedChecksum)
             {
                 throw new IllegalStateException(String.format("Dictionary 
checksum mismatch for %s dict id %d. Expected: %d, actual: %d",
-                                                               kindStr, 
dictId, storedChecksum, calculatedChecksum));
+                                                              kindStr, dictId, 
storedChecksum, calculatedChecksum));
             }
 
             return kind.createDictionary(new DictId(kind, dictId), 
row.getByteArray("dict"), storedChecksum);
diff --git 
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryCache.java 
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryCache.java
index 59d64b43e7..a58bd0e386 100644
--- 
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryCache.java
+++ 
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryCache.java
@@ -62,17 +62,17 @@ public class CompressionDictionaryCache implements 
ICompressionDictionaryCache
                              .removalListener((DictId dictId,
                                                CompressionDictionary 
dictionary,
                                                RemovalCause cause) -> {
-                                 // Close dictionary when evicted from cache 
to free native resources
-                                 // SelfRefCounted ensures dictionary won't be 
actually closed if still referenced by compressors
+                                 // Release the cache's reference to the 
dictionary when evicted
+                                 // The dictionary will only be truly cleaned 
up when all references are released
                                  if (dictionary != null)
                                  {
                                      try
                                      {
-                                         dictionary.close();
+                                         dictionary.selfRef().release();
                                      }
                                      catch (Exception e)
                                      {
-                                         logger.warn("Failed to close 
compression dictionary {}", dictId, e);
+                                         logger.warn("Failed to release 
compression dictionary {}", dictId, e);
                                      }
                                  }
                              })
diff --git 
a/src/java/org/apache/cassandra/db/compression/ZstdCompressionDictionary.java 
b/src/java/org/apache/cassandra/db/compression/ZstdCompressionDictionary.java
index 14c164dcb5..ed83a8a4d5 100644
--- 
a/src/java/org/apache/cassandra/db/compression/ZstdCompressionDictionary.java
+++ 
b/src/java/org/apache/cassandra/db/compression/ZstdCompressionDictionary.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.db.compression;
 
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
@@ -42,8 +42,7 @@ public class ZstdCompressionDictionary implements 
CompressionDictionary, SelfRef
     private final int checksum;
     // One ZstdDictDecompress and multiple ZstdDictCompress (per level) can be 
derived from the same raw dictionary content
     private final ConcurrentHashMap<Integer, ZstdDictCompress> 
zstdDictCompressPerLevel = new ConcurrentHashMap<>();
-    private volatile ZstdDictDecompress dictDecompress;
-    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final AtomicReference<ZstdDictDecompress> dictDecompress = new 
AtomicReference<>();
     private final Ref<ZstdCompressionDictionary> selfRef;
 
     @VisibleForTesting
@@ -90,7 +89,7 @@ public class ZstdCompressionDictionary implements 
CompressionDictionary, SelfRef
     public int estimatedOccupiedMemoryBytes()
     {
         int occupied = rawDictionary.length;
-        occupied += dictDecompress != null ? rawDictionary.length : 0;
+        occupied += dictDecompress.get() != null ? rawDictionary.length : 0;
         occupied += zstdDictCompressPerLevel.size() * rawDictionary.length;
 
         return occupied;
@@ -114,50 +113,65 @@ public class ZstdCompressionDictionary implements 
CompressionDictionary, SelfRef
      * Get a pre-processed compression tables that is optimized for 
compression.
      * It is derived/computed from dictionary bytes.
      * The internal data structure is different from the tables for 
decompression.
-     *
+     * <br>
+     * IMPORTANT: Caller MUST hold a valid reference (via tryRef/ref) to this 
dictionary.
+     * The reference counting mechanism ensures tidy() cannot run while 
references exist,
+     * making synchronization unnecessary. This method is safe to call 
concurrently as long
+     * as each caller holds a reference.
+     * <br>
      * @param compressionLevel compression level to create the compression 
table
-     * @return ZstdDictCompress
+     * @return ZstdDictCompress for the specified compression level
+     * @throws IllegalStateException if called without holding a valid 
reference
      */
     public ZstdDictCompress dictionaryForCompression(int compressionLevel)
     {
-        if (closed.get())
-            throw new IllegalStateException("Dictionary has been closed. " + 
dictId);
-
+        ensureNotReleased();
         ZstdCompressorBase.validateCompressionLevel(compressionLevel);
 
-        return zstdDictCompressPerLevel.computeIfAbsent(compressionLevel, 
level -> {
-            if (closed.get())
-                throw new IllegalStateException("Dictionary has been closed");
-            return new ZstdDictCompress(rawDictionary, level);
-        });
+        // Fast path: check if already exists to avoid locking the bin
+        ZstdDictCompress existing = 
zstdDictCompressPerLevel.get(compressionLevel);
+        if (existing != null)
+            return existing;
+
+        // A little slow path: create new dictionary for this compression level
+        // No additional synchronization needed - reference counting prevents 
tidy() while in use
+        return zstdDictCompressPerLevel.computeIfAbsent(compressionLevel, 
level ->
+            new ZstdDictCompress(rawDictionary, level));
     }
 
     /**
      * Get a pre-processed decompression tables that is optimized for 
decompression.
      * It is derived/computed from dictionary bytes.
      * The internal data structure is different from the tables for 
compression.
+     * <br>
+     * IMPORTANT: Caller MUST hold a valid reference (via tryRef/ref) to this 
dictionary.
+     * The reference counting mechanism ensures tidy() cannot run while 
references exist,
+     * making synchronization unnecessary. This method is safe to call 
concurrently as long
+     * as each caller holds a reference.
+     * <br>
+     * Thread-safe: Multiple threads can safely call this method concurrently.
+     * The decompression dictionary will be created exactly once on first 
access.
      *
-     * @return ZstdDictDecompress
+     * @return ZstdDictDecompress for decompression operations
+     * @throws IllegalStateException if called without holding a valid 
reference
      */
     public ZstdDictDecompress dictionaryForDecompression()
     {
-        if (closed.get())
-            throw new IllegalStateException("Dictionary has been closed");
-
-        ZstdDictDecompress result = dictDecompress;
+        ensureNotReleased();
+        // Fast path: if already initialized, return immediately
+        ZstdDictDecompress result = dictDecompress.get();
         if (result != null)
             return result;
 
+        // Slow path: need to initialize with proper double-checked locking
+        // Reference counting guarantees tidy() won't run during this operation
         synchronized (this)
         {
-            if (closed.get())
-                throw new IllegalStateException("Dictionary has been closed");
-
-            result = dictDecompress;
+            result = dictDecompress.get();
             if (result == null)
             {
                 result = new ZstdDictDecompress(rawDictionary);
-                dictDecompress = result;
+                dictDecompress.set(result);
             }
             return result;
         }
@@ -181,30 +195,48 @@ public class ZstdCompressionDictionary implements 
CompressionDictionary, SelfRef
         return selfRef.ref();
     }
 
-    @Override
-    public void close()
+    private void ensureNotReleased()
     {
-        if (closed.compareAndSet(false, true))
-        {
-            selfRef.release();
-        }
+        if (selfRef.globalCount() <= 0)
+            throw new IllegalStateException("Dictionary has been released: " + 
dictId);
     }
 
+    /**
+     * Tidy implementation for cleaning up native Zstd resources.
+     *
+     * This class holds direct references to the resources that need cleanup,
+     * avoiding a circular reference pattern where Tidy would hold a reference
+     * to the parent dictionary object.
+     */
     private static class Tidy implements RefCounted.Tidy
     {
         private final ConcurrentHashMap<Integer, ZstdDictCompress> 
zstdDictCompressPerLevel;
-        private volatile ZstdDictDecompress dictDecompress;
+        private final AtomicReference<ZstdDictDecompress> dictDecompress;
 
-        Tidy(ConcurrentHashMap<Integer, ZstdDictCompress> 
zstdDictCompressPerLevel, ZstdDictDecompress dictDecompress)
+        Tidy(ConcurrentHashMap<Integer, ZstdDictCompress> 
zstdDictCompressPerLevel,
+             AtomicReference<ZstdDictDecompress> dictDecompress)
         {
             this.zstdDictCompressPerLevel = zstdDictCompressPerLevel;
             this.dictDecompress = dictDecompress;
         }
 
+        /**
+         * Clean up native resources when reference count reaches zero.
+         *
+         * IMPORTANT: This method is called exactly once when the last 
reference is released.
+         * Reference counting guarantees that no other thread can be executing
+         * dictionaryForCompression/Decompression when this runs, because:
+         * 1. Those methods require holding a valid reference
+         * 2. This only runs when refcount goes from 0 to -1
+         * 3. Once refcount is negative, tryRef() returns null, preventing new 
references
+         *
+         * Therefore, no synchronization is needed - we have exclusive access 
to clean up.
+         */
         @Override
         public void tidy()
         {
             // Close all compression dictionaries
+            // No synchronization needed - reference counting ensures 
exclusive access
             for (ZstdDictCompress compressDict : 
zstdDictCompressPerLevel.values())
             {
                 try
@@ -220,7 +252,7 @@ public class ZstdCompressionDictionary implements 
CompressionDictionary, SelfRef
             zstdDictCompressPerLevel.clear();
 
             // Close decompression dictionary
-            ZstdDictDecompress decompressDict = dictDecompress;
+            ZstdDictDecompress decompressDict = dictDecompress.get();
             if (decompressDict != null)
             {
                 try
@@ -231,7 +263,7 @@ public class ZstdCompressionDictionary implements 
CompressionDictionary, SelfRef
                 {
                     logger.warn("Failed to close ZstdDictDecompress", e);
                 }
-                dictDecompress = null;
+                dictDecompress.set(null);
             }
         }
 
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java 
b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index f49734a659..bd09b9081c 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -133,7 +133,8 @@ public class CompressionMetadata extends 
WrappedSharedCloseable
                                        compressedLength, 
compressionDictionary);
     }
 
-    // do not call this constructor directly, unless used in testing
+    // Do not call this constructor from outside this class file, except in 
tests.
+    // Within this class, use the static open() method or the Writer.open() 
method instead.
     @VisibleForTesting
     public CompressionMetadata(File chunksIndexFile,
                                CompressionParams parameters,
@@ -143,7 +144,8 @@ public class CompressionMetadata extends 
WrappedSharedCloseable
                                long compressedFileLength,
                                CompressionDictionary compressionDictionary)
     {
-        super(chunkOffsets);
+        // Build array with chunkOffsets and a wrapper that releases 
dictionary ref
+        super(buildCloseableArray(chunkOffsets, compressionDictionary));
         this.chunksIndexFile = chunksIndexFile;
         this.parameters = parameters;
         this.dataLength = dataLength;
@@ -153,6 +155,37 @@ public class CompressionMetadata extends 
WrappedSharedCloseable
         this.compressionDictionary = compressionDictionary;
     }
 
+    private static AutoCloseable[] buildCloseableArray(Memory chunkOffsets, 
CompressionDictionary dictionary)
+    {
+        if (dictionary == null)
+            return new AutoCloseable[] { chunkOffsets };
+
+        Ref<? extends CompressionDictionary> dictRef = dictionary.tryRef();
+        if (dictRef == null)
+        {
+            // Close chunkOffsets before throwing to prevent resource leak.
+            // The CompressionMetadata constructor will not complete if we 
throw here,
+            // so we must clean up resources that were passed in.
+            chunkOffsets.close();
+            throw new IllegalStateException("Failed to acquire reference to 
compression dictionary");
+        }
+
+        return new AutoCloseable[] { chunkOffsets, dictRef::release };
+    }
+
+    /**
+     * Copy constructor for creating shared copies via sharedCopy().
+     * <br>
+     * This uses the WrappedSharedCloseable pattern where all copies share the 
same
+     * underlying resources (chunkOffsets Memory and dictionary reference). 
The super()
+     * call increments the shared reference count, and resources are only 
released when
+     * the last copy is closed.
+     * <br>
+     * Reference counting behavior:
+     * - Original CompressionMetadata acquires 1 dictionary reference (in 
buildCloseableArray)
+     * - All copies share that reference (via super(copy) incrementing shared 
ref count)
+     * - When last copy closes, WrappedSharedCloseable.Tidy releases the 
reference once
+     */
     private CompressionMetadata(CompressionMetadata copy)
     {
         super(copy);
@@ -163,11 +196,11 @@ public class CompressionMetadata extends 
WrappedSharedCloseable
         this.chunkOffsets = copy.chunkOffsets;
         this.chunkOffsetsSize = copy.chunkOffsetsSize;
         this.compressionDictionary = copy.compressionDictionary;
+        this.resolvedCompressor = copy.resolvedCompressor;
     }
 
     public ICompressor compressor()
     {
-        // classic double-checked locking to call resolveCompressor method 
just once per CompressionMetadata object
         ICompressor result = resolvedCompressor;
         if (result != null)
             return result;
@@ -228,6 +261,8 @@ public class CompressionMetadata extends 
WrappedSharedCloseable
     {
         super.addTo(identities);
         identities.add(chunkOffsets);
+        // Note: compressionDictionary ref is managed by 
WrappedSharedCloseable,
+        // so it's already tracked through the parent's identity collection
     }
 
     @Override
@@ -408,15 +443,37 @@ public class CompressionMetadata extends 
WrappedSharedCloseable
         // provided by user when setDescriptor
         private long dataLength, chunkCount;
         @Nullable
-        private CompressionDictionary compressionDictionary;
+        private final CompressionDictionary compressionDictionary;
+        @Nullable // Reference to keep dictionary alive during write
+        private Ref<? extends CompressionDictionary> compressionDictionaryRef;
 
         private Writer(CompressionParams parameters, File file, 
CompressionDictionary compressionDictionary)
         {
             this.parameters = parameters;
             this.file = file;
             this.compressionDictionary = compressionDictionary;
+            // Take a reference to ensure dictionary stays alive during 
SSTable write
+            if (compressionDictionary != null)
+            {
+                this.compressionDictionaryRef = compressionDictionary.tryRef();
+                if (compressionDictionaryRef == null)
+                {
+                    // Clean up offsets SafeMemory allocated in field 
initializer before throwing
+                    // to prevent resource leak. The offsets field is 
initialized before constructor
+                    // body runs, so it must be explicitly cleaned up if 
construction fails.
+                    offsets.close();
+                    throw new IllegalStateException("Failed to acquire 
reference to compression dictionary " + compressionDictionary.dictId());
+                }
+            }
         }
 
+        /**
+         * Creates a new Writer for compression metadata.
+         *
+         * Note on resource management: If this method throws an exception, 
all resources
+         * are properly cleaned up. The Writer constructor ensures that if 
dictionary
+         * reference acquisition fails, the offsets SafeMemory is released.
+         */
         public static Writer open(CompressionParams parameters,
                                   File file,
                                   CompressionDictionary compressionDictionary)
@@ -569,12 +626,24 @@ public class CompressionMetadata extends 
WrappedSharedCloseable
         @Override
         protected Throwable doCommit(Throwable accumulate)
         {
+            // Release the dictionary reference after successful write
+            if (compressionDictionaryRef != null)
+            {
+                compressionDictionaryRef.release();
+                compressionDictionaryRef = null;
+            }
             return accumulate;
         }
 
         @Override
         protected Throwable doAbort(Throwable accumulate)
         {
+            // Release the dictionary reference
+            if (compressionDictionaryRef != null)
+            {
+                compressionDictionaryRef.release();
+                compressionDictionaryRef = null;
+            }
             return accumulate;
         }
     }
diff --git 
a/src/java/org/apache/cassandra/io/compress/ZstdDictionaryCompressor.java 
b/src/java/org/apache/cassandra/io/compress/ZstdDictionaryCompressor.java
index 3ba5841aaa..59cba54b20 100644
--- a/src/java/org/apache/cassandra/io/compress/ZstdDictionaryCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/ZstdDictionaryCompressor.java
@@ -33,6 +33,7 @@ import com.github.luben.zstd.Zstd;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.db.compression.ZstdCompressionDictionary;
 import org.apache.cassandra.db.compression.CompressionDictionary.Kind;
 import org.apache.cassandra.utils.concurrent.Ref;
@@ -66,7 +67,7 @@ public class ZstdDictionaryCompressor extends 
ZstdCompressorBase implements ICom
 
     /**
      * Create a ZstdDictionaryCompressor with the given options
-     * Invoked by {@link 
org.apache.cassandra.schema.CompressionParams#createCompressor} via reflection
+     * Invoked by {@link 
org.apache.cassandra.schema.CompressionParams#createCompressor(ParameterizedClass)}
 via reflection
      *
      * @param options compression options
      * @return ZstdDictionaryCompressor
@@ -93,11 +94,10 @@ public class ZstdDictionaryCompressor extends 
ZstdCompressorBase implements ICom
 
         return instancePerDict.get(dictionary, dict -> {
             // Get a reference to the dictionary when creating new compressor
-            Ref<ZstdCompressionDictionary> ref = dict != null ? dict.tryRef() 
: null;
-            if (ref == null && dict != null)
+            Ref<ZstdCompressionDictionary> ref = dict.tryRef();
+            if (ref == null)
             {
-                // Dictionary is being closed, cannot create compressor
-                throw new IllegalStateException("Dictionary is being closed");
+                throw new IllegalStateException("Dictionary is released");
             }
             return new ZstdDictionaryCompressor(level, dictionary, ref);
         });
@@ -212,5 +212,6 @@ public class ZstdDictionaryCompressor extends 
ZstdCompressorBase implements ICom
     public static void invalidateCache()
     {
         instancePerDict.invalidateAll();
+        instancePerDict.cleanUp();
     }
 }
diff --git a/src/java/org/apache/cassandra/utils/concurrent/RefCounted.java 
b/src/java/org/apache/cassandra/utils/concurrent/RefCounted.java
index d09d288d3e..e60be973da 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/RefCounted.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/RefCounted.java
@@ -33,15 +33,37 @@ package org.apache.cassandra.utils.concurrent;
 public interface RefCounted<T>
 {
     /**
-     * @return the a new Ref() to the managed object, incrementing its 
refcount, or null if it is already released
+     * Attempts to acquire a new reference to the managed object and
+     * increment the reference count.
+     *
+     * @return a new Ref to the managed object, or null if already released
      */
     public Ref<T> tryRef();
 
+    /**
+     * Acquires a new reference to the managed object and increment the
+     * reference count.
+     *
+     * @return a new Ref to the managed object
+     * @throws IllegalStateException if already released
+     */
     public Ref<T> ref();
 
     public static interface Tidy
     {
+        /**
+         * Performs cleanup of resources when the reference count reaches zero.
+         * Called exactly once when the last reference is released.
+         *
+         * @throws Exception if cleanup fails
+         */
         void tidy() throws Exception;
+
+        /**
+         * Returns a human-readable name for debugging and leak detection.
+         *
+         * @return a descriptive name, typically the class name of the tracked 
object
+         */
         String name();
     }
 }
diff --git 
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryCacheTest.java
 
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryCacheTest.java
index 878780982d..46565b7c04 100644
--- 
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryCacheTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryCacheTest.java
@@ -421,13 +421,13 @@ public class CompressionDictionaryCacheTest
         }
     }
 
-    private static void closeQuietly(AutoCloseable resource)
+    private static void closeQuietly(ZstdCompressionDictionary resource)
     {
         if (resource != null)
         {
             try
             {
-                resource.close();
+                resource.selfRef().release();
             }
             catch (Exception e)
             {
diff --git 
a/test/unit/org/apache/cassandra/db/compression/ZstdCompressionDictionaryTest.java
 
b/test/unit/org/apache/cassandra/db/compression/ZstdCompressionDictionaryTest.java
index e054776f96..2b226eed4d 100644
--- 
a/test/unit/org/apache/cassandra/db/compression/ZstdCompressionDictionaryTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compression/ZstdCompressionDictionaryTest.java
@@ -68,7 +68,7 @@ public class ZstdCompressionDictionaryTest
     {
         ZstdCompressionDictionary dictionary2 = new 
ZstdCompressionDictionary(SAMPLE_DICT_ID, SAMPLE_DICT_DATA);
         ZstdCompressionDictionary differentIdDict = new 
ZstdCompressionDictionary(
-            new DictId(Kind.ZSTD, 987654321L), SAMPLE_DICT_DATA);
+        new DictId(Kind.ZSTD, 987654321L), SAMPLE_DICT_DATA);
 
         assertThat(dictionary)
         .as("Dictionaries with same ID should be equal")
@@ -82,8 +82,8 @@ public class ZstdCompressionDictionaryTest
         .as("Dictionaries with different IDs should not be equal")
         .isNotEqualTo(differentIdDict);
 
-        dictionary2.close();
-        differentIdDict.close();
+        dictionary2.selfRef().release();
+        differentIdDict.selfRef().release();
     }
 
     @Test
@@ -168,17 +168,48 @@ public class ZstdCompressionDictionaryTest
         dictionary.dictionaryForCompression(3);
         dictionary.dictionaryForDecompression();
 
-        dictionary.close();
+        // Release the self-reference
+        dictionary.selfRef().release();
 
-        assertThatThrownBy(() -> dictionary.dictionaryForCompression(3))
-        .as("Should throw exception when accessing closed dictionary")
+        // After releasing selfRef, tryRef should return null
+        Ref<ZstdCompressionDictionary> ref = dictionary.tryRef();
+        assertThat(ref)
+        .as("tryRef should return null after dictionary is released")
+        .isNull();
+    }
+
+    @Test
+    public void testDictionaryAccessWithoutReference()
+    {
+        // Create a new dictionary for this test
+        ZstdCompressionDictionary testDict = new ZstdCompressionDictionary(
+            new DictId(Kind.ZSTD, 999999L),
+            SAMPLE_DICT_DATA
+        );
+
+        // Access some dictionaries first to initialize them
+        testDict.dictionaryForCompression(3);
+        testDict.dictionaryForDecompression();
+
+        // Release the self-reference to simulate dictionary cleanup
+        testDict.selfRef().release();
+
+        // Verify tryRef returns null after release
+        assertThat(testDict.tryRef())
+        .as("tryRef should return null after dictionary is released")
+        .isNull();
+
+        // Accessing dictionary methods without a valid reference should throw 
IllegalStateException
+        // This protects against use-after-free bugs in both development and 
production
+        assertThatThrownBy(() -> testDict.dictionaryForCompression(3))
+        .as("Should throw exception when accessing released dictionary")
         .isInstanceOf(IllegalStateException.class)
-        .hasMessageContaining("Dictionary has been closed");
+        .hasMessageContaining("Dictionary has been released");
 
-        assertThatThrownBy(() -> dictionary.dictionaryForDecompression())
-        .as("Should throw exception when accessing closed dictionary")
+        assertThatThrownBy(() -> testDict.dictionaryForDecompression())
+        .as("Should throw exception when accessing released dictionary")
         .isInstanceOf(IllegalStateException.class)
-        .hasMessageContaining("Dictionary has been closed");
+        .hasMessageContaining("Dictionary has been released");
     }
 
     @Test
@@ -229,15 +260,16 @@ public class ZstdCompressionDictionaryTest
     @Test
     public void testReferenceAfterClose()
     {
-        dictionary.close();
+        // Release the self-reference
+        dictionary.selfRef().release();
 
         assertThatThrownBy(() -> dictionary.ref())
-        .as("Should not be able to get reference after close")
+        .as("Should not be able to get reference after release")
         .isInstanceOf(AssertionError.class);
 
         Ref<ZstdCompressionDictionary> tryRef = dictionary.tryRef();
         assertThat(tryRef)
-        .as("tryRef should return null after close")
+        .as("tryRef should return null after release")
         .isNull();
     }
 
@@ -355,8 +387,8 @@ public class ZstdCompressionDictionaryTest
         .isNotNull()
         .isEqualTo(dict2);
 
-        dict1.close();
-        dict2.close();
+        dict1.selfRef().release();
+        dict2.selfRef().release();
     }
 
     @Test
diff --git 
a/test/unit/org/apache/cassandra/io/compress/CompressionMetadataTest.java 
b/test/unit/org/apache/cassandra/io/compress/CompressionMetadataTest.java
index 560a0af635..801a07c151 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressionMetadataTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressionMetadataTest.java
@@ -19,14 +19,21 @@
 package org.apache.cassandra.io.compress;
 
 
+import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compression.CompressionDictionary;
+import org.apache.cassandra.db.compression.CompressionDictionaryCache;
+import org.apache.cassandra.db.compression.ZstdCompressionDictionary;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.io.util.Memory;
 import org.apache.cassandra.schema.CompressionParams;
 
+import static org.apache.cassandra.Util.spinAssertEquals;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 public class CompressionMetadataTest
 {
@@ -35,6 +42,12 @@ public class CompressionMetadataTest
     long dataLength = 1000;
     long compressedFileLength = 100;
 
+    @BeforeClass
+    public static void setUpClass()
+    {
+        DatabaseDescriptor.daemonInitialization();
+    }
+
     private CompressionMetadata newCompressionMetadata(Memory memory)
     {
         return new CompressionMetadata(chunksIndexFile,
@@ -76,4 +89,170 @@ public class CompressionMetadataTest
         assertThat(copy.isCleanedUp()).isTrue();
         
assertThatExceptionOfType(AssertionError.class).isThrownBy(memory::size);
     }
+
+    /**
+     * Test that simulates the Zstd with dictionary → LZ4 schema change 
scenario. (CASSANDRA-21047)
+     */
+    @Test
+    public void testDictionaryRefCountingDuringSchemaChange()
+    {
+        // Step 1: Create a dictionary and add it to cache (simulating table 
with Zstd compression)
+        CompressionDictionary.DictId dictId = new 
CompressionDictionary.DictId(CompressionDictionary.Kind.ZSTD, 1L);
+        byte[] dictBytes = "sample dictionary data for compression".getBytes();
+        ZstdCompressionDictionary dictionary = new 
ZstdCompressionDictionary(dictId, dictBytes);
+
+        assertThat(dictionary.selfRef().globalCount()).isOne();
+        CompressionDictionaryCache cache = new CompressionDictionaryCache();
+        cache.add(dictionary);
+
+        // Verify dictionary is in cache
+        CompressionDictionary cachedDict = cache.get(dictId);
+        assertThat(cachedDict)
+        .as("Dictionary should be cached")
+        .isNotNull()
+        .isSameAs(dictionary);
+        assertThat(dictionary.selfRef().globalCount()).isOne();
+
+        // Step 2: Open SSTable with dictionary (simulating 
CompressionMetadata)
+        Memory memory = Memory.allocate(100);
+        CompressionMetadata metadata = new CompressionMetadata(
+        chunksIndexFile,
+        params,
+        memory,
+        memory.size(),
+        dataLength,
+        compressedFileLength,
+        dictionary  // CompressionMetadata takes a reference
+        );
+        // RC is incremented
+        assertThat(dictionary.selfRef().globalCount()).isEqualTo(2);
+
+        // Verify we can get the compressor (which uses the dictionary)
+        ICompressor compressor = metadata.compressor();
+        assertThat(compressor)
+        .as("Should be able to get compressor with dictionary")
+        .isNotNull();
+        // RC is incremented
+        assertThat(dictionary.selfRef().globalCount()).isEqualTo(3);
+
+        // Step 3: Schema change - close cache (simulating switch from Zstd to 
LZ4)
+        // This releases the cache's reference to the dictionary
+        cache.close();
+        // RC is decremented
+        spinAssertEquals(2, () -> dictionary.selfRef().globalCount());
+
+        // Step 4: Verify dictionary is still usable via CompressionMetadata
+        // This is the key test - before the fix, this would fail with 
"Dictionary has been closed"
+        // compressor should return the cached compressor, does not change the 
reference count
+        ICompressor compressorAfterCacheClosed = metadata.compressor();
+        assertThat(compressorAfterCacheClosed)
+        .as("Compressor should still be accessible after cache is closed")
+        .isNotNull()
+        .isSameAs(compressor);  // Should return the same cached compressor
+
+        // Verify dictionary methods still work
+        assertThat(dictionary.dictionaryForDecompression())
+        .as("Dictionary decompression should still work")
+        .isNotNull();
+
+        // Step 5: Close CompressionMetadata (simulating SSTable being 
compacted away)
+        metadata.close();
+        // RC is decremented
+        assertThat(dictionary.selfRef().globalCount()).isOne();
+
+        // Step 6: Dictionary is still usable because ZstdDictionaryCompressor 
cache holds a reference
+        // This is expected behavior - the compressor cache is global and may 
keep dictionaries
+        // alive for reuse across SSTables
+        assertThat(dictionary.dictionaryForDecompression())
+        .as("Dictionary should still work due to compressor cache")
+        .isNotNull();
+
+        ZstdDictionaryCompressor.invalidateCache();
+        // RC is decremented
+        spinAssertEquals(0, () -> dictionary.selfRef().globalCount());
+        assertThatThrownBy(dictionary::dictionaryForDecompression)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessageContaining("Dictionary has been released");
+    }
+
+    /**
+     * Test multiple CompressionMetadata instances sharing the same dictionary
+     * and surviving cache eviction. (CASSANDRA-21047)
+     */
+    @Test
+    public void testMultipleMetadataInstancesSharingDictionary()
+    {
+        // Create dictionary and cache
+        CompressionDictionary.DictId dictId = new 
CompressionDictionary.DictId(CompressionDictionary.Kind.ZSTD, 2L);
+        byte[] dictBytes = "shared dictionary data".getBytes();
+        ZstdCompressionDictionary dictionary = new 
ZstdCompressionDictionary(dictId, dictBytes);
+
+        assertThat(dictionary.selfRef().globalCount()).isOne();
+        CompressionDictionaryCache cache = new CompressionDictionaryCache();
+        cache.add(dictionary);
+
+        // Create multiple CompressionMetadata instances (simulating multiple 
SSTables)
+        Memory memory1 = Memory.allocate(100);
+        CompressionMetadata metadata1 = new CompressionMetadata(
+        chunksIndexFile, params, memory1, memory1.size(),
+        dataLength, compressedFileLength, dictionary
+        );
+        assertThat(dictionary.selfRef().globalCount()).isEqualTo(2);
+
+        Memory memory2 = Memory.allocate(100);
+        CompressionMetadata metadata2 = new CompressionMetadata(
+        chunksIndexFile, params, memory2, memory2.size(),
+        dataLength, compressedFileLength, dictionary
+        );
+        assertThat(dictionary.selfRef().globalCount()).isEqualTo(3);
+
+        Memory memory3 = Memory.allocate(100);
+        CompressionMetadata metadata3 = new CompressionMetadata(
+        chunksIndexFile, params, memory3, memory3.size(),
+        dataLength, compressedFileLength, dictionary
+        );
+        assertThat(dictionary.selfRef().globalCount()).isEqualTo(4);
+
+        // All should be able to get compressors
+        assertThat(metadata1.compressor()).isNotNull();
+        assertThat(metadata2.compressor()).isNotNull();
+        assertThat(metadata3.compressor()).isNotNull();
+        assertThat(dictionary.selfRef().globalCount()).isEqualTo(5);
+
+        // Close cache (schema change)
+        cache.close();
+        spinAssertEquals(4, () -> dictionary.selfRef().globalCount());
+
+        // All metadata instances should still work
+        assertThat(metadata1.compressor()).isNotNull();
+        assertThat(metadata2.compressor()).isNotNull();
+        assertThat(metadata3.compressor()).isNotNull();
+
+        // Close metadata instances one by one
+        metadata1.close();
+        assertThat(dictionary.selfRef().globalCount()).isEqualTo(3);
+        assertThat(metadata2.compressor())
+        .as("Other metadata should still work")
+        .isNotNull();
+        assertThat(metadata3.compressor())
+        .as("Other metadata should still work")
+        .isNotNull();
+
+        metadata2.close();
+        assertThat(dictionary.selfRef().globalCount()).isEqualTo(2);
+        assertThat(metadata3.compressor())
+        .as("Last metadata should still work")
+        .isNotNull();
+
+        // Close last instance - now dictionary should be released
+        metadata3.close();
+        assertThat(dictionary.selfRef().globalCount()).isEqualTo(1);
+
+        ZstdDictionaryCompressor.invalidateCache();
+        // RC is decremented
+        spinAssertEquals(0, () -> dictionary.selfRef().globalCount());
+        assertThatThrownBy(dictionary::dictionaryForDecompression)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessageContaining("Dictionary has been released");
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to