This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 21382e6874 Fix CompressionDictionary being closed while still in use
21382e6874 is described below
commit 21382e68744eaa64881f9e3953c21cecf7aea199
Author: Yifan Cai <[email protected]>
AuthorDate: Wed Dec 3 22:12:55 2025 -0800
Fix CompressionDictionary being closed while still in use
Patch by Yifan Cai; Reviewed by Stefan Miklosovic for CASSANDRA-21047
---
CHANGES.txt | 1 +
.../db/compression/CompressionDictionary.java | 82 +++++++++-
.../db/compression/CompressionDictionaryCache.java | 8 +-
.../db/compression/ZstdCompressionDictionary.java | 100 ++++++++----
.../cassandra/io/compress/CompressionMetadata.java | 77 ++++++++-
.../io/compress/ZstdDictionaryCompressor.java | 11 +-
.../cassandra/utils/concurrent/RefCounted.java | 24 ++-
.../CompressionDictionaryCacheTest.java | 4 +-
.../compression/ZstdCompressionDictionaryTest.java | 62 +++++--
.../io/compress/CompressionMetadataTest.java | 179 +++++++++++++++++++++
10 files changed, 480 insertions(+), 68 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 6cd56b9ed2..c699a8f26b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Fix CompressionDictionary being closed while still in use (CASSANDRA-21047)
* When updating a multi cell collection element, if the update is rejected
then the shared Row.Builder is not freed causing all future mutations to be
rejected (CASSANDRA-21055)
* Schema annotations escape validation on CREATE and ALTER DDL statements
(CASSANDRA-21046)
* Calculate once and cache the result of ModificationStatement#requiresRead
as a perf optimization (CASSANDRA-21040)
diff --git
a/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
b/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
index b3725cc815..fff6255c3f 100644
--- a/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
+++ b/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.util.Objects;
import javax.annotation.Nullable;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
@@ -32,8 +33,41 @@ import com.google.common.hash.Hashing;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.compress.ZstdDictionaryCompressor;
+import org.apache.cassandra.utils.concurrent.Ref;
-public interface CompressionDictionary extends AutoCloseable
+/**
+ * Interface for compression dictionaries with reference-counted lifecycle
management.
+ *
+ * <h2>Reference Counting Model</h2>
+ * Compression dictionaries hold native resources that must be explicitly
managed. This interface
+ * uses {@link Ref} for safe lifecycle management across multiple concurrent
users.
+ *
+ * <h3>Ownership and Usage in Cassandra</h3>
+ * <ul>
+ * <li><b>CompressionDictionaryManager</b>: Holds the primary reference
({@link #selfRef()}) for cached dictionaries</li>
+ * <li><b>CompressionMetadata.Writer</b>: Acquires a reference during
SSTable write, held for the writer's lifetime</li>
+ * <li><b>CompressionMetadata</b>: Acquires a reference when created (via
{@link #tryRef()}), held for the SSTable reader's lifetime.
+ * All copies created via sharedCopy() share this single reference
through WrappedSharedCloseable</li>
+ * </ul>
+ *
+ * <h3>Correctness Guarantee</h3>
+ * The reference counting prevents premature cleanup of native resources:
+ * <ol>
+ * <li>CompressionMetadata acquires a reference when an SSTable is
opened</li>
+ * <li>Native resources remain valid as long as any reference exists
(refcount > 0)</li>
+ * <li>Even if the cache evicts the dictionary, the SSTable's reference
keeps resources alive</li>
+ * <li>Cleanup runs exactly once when the last reference is released
(refcount goes 0 → -1)</li>
+ * <li>After cleanup, {@link #tryRef()} returns null, preventing new
references to released resources</li>
+ * </ol>
+ *
+ * This ensures dictionaries cannot be freed while SSTables are using them for
compression/decompression,
+ * even when the cache evicts the dictionary concurrently.
+ *
+ * @see Ref for reference counting implementation
+ * @see CompressionDictionaryManager for cache management
+ * @see org.apache.cassandra.io.compress.CompressionMetadata for SSTable usage
+ */
+public interface CompressionDictionary
{
/**
* Get the dictionary id
@@ -75,6 +109,48 @@ public interface CompressionDictionary extends AutoCloseable
return dictId().kind;
}
+ /**
+ * Try to acquire a new reference to this dictionary.
+ * Returns null if the dictionary is already released.
+ * <p>
+ * The caller must ensure the returned reference is released when no
longer needed,
+ * either by calling {@code ref.release()} or {@code ref.close()} (they
are equivalent).
+ * Failing to release the reference will prevent cleanup of native
resources and cause
+ * a memory leak.
+ *
+ * @return a new reference to this dictionary, or null if already released
+ */
+ Ref<? extends CompressionDictionary> tryRef();
+
+ /**
+ * Get the self-reference of this dictionary.
+ * This is used to release the primary reference held by the cache.
+ *
+ * @return the self-reference
+ */
+ Ref<? extends CompressionDictionary> selfRef();
+
+ /**
+ * Releases the self-reference of this dictionary.
+ * This is a convenience method equivalent to calling {@code
selfRef().close()}.
+ * <p>
+ * This method is idempotent - calling it multiple times is safe and will
only
+ * release the self-reference once. Subsequent calls have no effect.
+ * <p>
+ * This method is typically used when creating a dictionary outside the
cache
+ * (e.g., in tests or temporary usage) and needing to clean it up. For
dictionaries
+ * managed by the cache, the cache's removal listener handles cleanup via
+ * {@code selfRef().release()}.
+ *
+ * @see #selfRef()
+ * @see #tryRef()
+ */
+ @VisibleForTesting
+ default void close()
+ {
+ selfRef().close();
+ }
+
/**
* Write compression dictionary to file
*
@@ -192,7 +268,7 @@ public interface CompressionDictionary extends AutoCloseable
if (dict.length != storedLength)
{
throw new IllegalStateException(String.format("Dictionary
length mismatch for %s dict id %d. Expected: %d, actual: %d",
- kindStr,
dictId, storedLength, dict.length));
+ kindStr, dictId,
storedLength, dict.length));
}
// Validate checksum
@@ -200,7 +276,7 @@ public interface CompressionDictionary extends AutoCloseable
if (calculatedChecksum != storedChecksum)
{
throw new IllegalStateException(String.format("Dictionary
checksum mismatch for %s dict id %d. Expected: %d, actual: %d",
- kindStr,
dictId, storedChecksum, calculatedChecksum));
+ kindStr, dictId,
storedChecksum, calculatedChecksum));
}
return kind.createDictionary(new DictId(kind, dictId),
row.getByteArray("dict"), storedChecksum);
diff --git
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryCache.java
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryCache.java
index 59d64b43e7..a58bd0e386 100644
---
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryCache.java
+++
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryCache.java
@@ -62,17 +62,17 @@ public class CompressionDictionaryCache implements
ICompressionDictionaryCache
.removalListener((DictId dictId,
CompressionDictionary
dictionary,
RemovalCause cause) -> {
- // Close dictionary when evicted from cache
to free native resources
- // SelfRefCounted ensures dictionary won't be
actually closed if still referenced by compressors
+ // Release the cache's reference to the
dictionary when evicted
+ // The dictionary will only be truly cleaned
up when all references are released
if (dictionary != null)
{
try
{
- dictionary.close();
+ dictionary.selfRef().release();
}
catch (Exception e)
{
- logger.warn("Failed to close
compression dictionary {}", dictId, e);
+ logger.warn("Failed to release
compression dictionary {}", dictId, e);
}
}
})
diff --git
a/src/java/org/apache/cassandra/db/compression/ZstdCompressionDictionary.java
b/src/java/org/apache/cassandra/db/compression/ZstdCompressionDictionary.java
index 14c164dcb5..ed83a8a4d5 100644
---
a/src/java/org/apache/cassandra/db/compression/ZstdCompressionDictionary.java
+++
b/src/java/org/apache/cassandra/db/compression/ZstdCompressionDictionary.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.db.compression;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@@ -42,8 +42,7 @@ public class ZstdCompressionDictionary implements
CompressionDictionary, SelfRef
private final int checksum;
// One ZstdDictDecompress and multiple ZstdDictCompress (per level) can be
derived from the same raw dictionary content
private final ConcurrentHashMap<Integer, ZstdDictCompress>
zstdDictCompressPerLevel = new ConcurrentHashMap<>();
- private volatile ZstdDictDecompress dictDecompress;
- private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final AtomicReference<ZstdDictDecompress> dictDecompress = new
AtomicReference<>();
private final Ref<ZstdCompressionDictionary> selfRef;
@VisibleForTesting
@@ -90,7 +89,7 @@ public class ZstdCompressionDictionary implements
CompressionDictionary, SelfRef
public int estimatedOccupiedMemoryBytes()
{
int occupied = rawDictionary.length;
- occupied += dictDecompress != null ? rawDictionary.length : 0;
+ occupied += dictDecompress.get() != null ? rawDictionary.length : 0;
occupied += zstdDictCompressPerLevel.size() * rawDictionary.length;
return occupied;
@@ -114,50 +113,65 @@ public class ZstdCompressionDictionary implements
CompressionDictionary, SelfRef
* Get a pre-processed compression tables that is optimized for
compression.
* It is derived/computed from dictionary bytes.
* The internal data structure is different from the tables for
decompression.
- *
+ * <br>
+ * IMPORTANT: Caller MUST hold a valid reference (via tryRef/ref) to this
dictionary.
+ * The reference counting mechanism ensures tidy() cannot run while
references exist,
+ * making synchronization unnecessary. This method is safe to call
concurrently as long
+ * as each caller holds a reference.
+ * <br>
* @param compressionLevel compression level to create the compression
table
- * @return ZstdDictCompress
+ * @return ZstdDictCompress for the specified compression level
+ * @throws IllegalStateException if called without holding a valid
reference
*/
public ZstdDictCompress dictionaryForCompression(int compressionLevel)
{
- if (closed.get())
- throw new IllegalStateException("Dictionary has been closed. " +
dictId);
-
+ ensureNotReleased();
ZstdCompressorBase.validateCompressionLevel(compressionLevel);
- return zstdDictCompressPerLevel.computeIfAbsent(compressionLevel,
level -> {
- if (closed.get())
- throw new IllegalStateException("Dictionary has been closed");
- return new ZstdDictCompress(rawDictionary, level);
- });
+ // Fast path: check if already exists to avoid locking the bin
+ ZstdDictCompress existing =
zstdDictCompressPerLevel.get(compressionLevel);
+ if (existing != null)
+ return existing;
+
+ // A little slow path: create new dictionary for this compression level
+ // No additional synchronization needed - reference counting prevents
tidy() while in use
+ return zstdDictCompressPerLevel.computeIfAbsent(compressionLevel,
level ->
+ new ZstdDictCompress(rawDictionary, level));
}
/**
* Get a pre-processed decompression tables that is optimized for
decompression.
* It is derived/computed from dictionary bytes.
* The internal data structure is different from the tables for
compression.
+ * <br>
+ * IMPORTANT: Caller MUST hold a valid reference (via tryRef/ref) to this
dictionary.
+ * The reference counting mechanism ensures tidy() cannot run while
references exist,
+ * making synchronization unnecessary. This method is safe to call
concurrently as long
+ * as each caller holds a reference.
+ * <br>
+ * Thread-safe: Multiple threads can safely call this method concurrently.
+ * The decompression dictionary will be created exactly once on first
access.
*
- * @return ZstdDictDecompress
+ * @return ZstdDictDecompress for decompression operations
+ * @throws IllegalStateException if called without holding a valid
reference
*/
public ZstdDictDecompress dictionaryForDecompression()
{
- if (closed.get())
- throw new IllegalStateException("Dictionary has been closed");
-
- ZstdDictDecompress result = dictDecompress;
+ ensureNotReleased();
+ // Fast path: if already initialized, return immediately
+ ZstdDictDecompress result = dictDecompress.get();
if (result != null)
return result;
+ // Slow path: need to initialize with proper double-checked locking
+ // Reference counting guarantees tidy() won't run during this operation
synchronized (this)
{
- if (closed.get())
- throw new IllegalStateException("Dictionary has been closed");
-
- result = dictDecompress;
+ result = dictDecompress.get();
if (result == null)
{
result = new ZstdDictDecompress(rawDictionary);
- dictDecompress = result;
+ dictDecompress.set(result);
}
return result;
}
@@ -181,30 +195,48 @@ public class ZstdCompressionDictionary implements
CompressionDictionary, SelfRef
return selfRef.ref();
}
- @Override
- public void close()
+ private void ensureNotReleased()
{
- if (closed.compareAndSet(false, true))
- {
- selfRef.release();
- }
+ if (selfRef.globalCount() <= 0)
+ throw new IllegalStateException("Dictionary has been released: " +
dictId);
}
+ /**
+ * Tidy implementation for cleaning up native Zstd resources.
+ *
+ * This class holds direct references to the resources that need cleanup,
+ * avoiding a circular reference pattern where Tidy would hold a reference
+ * to the parent dictionary object.
+ */
private static class Tidy implements RefCounted.Tidy
{
private final ConcurrentHashMap<Integer, ZstdDictCompress>
zstdDictCompressPerLevel;
- private volatile ZstdDictDecompress dictDecompress;
+ private final AtomicReference<ZstdDictDecompress> dictDecompress;
- Tidy(ConcurrentHashMap<Integer, ZstdDictCompress>
zstdDictCompressPerLevel, ZstdDictDecompress dictDecompress)
+ Tidy(ConcurrentHashMap<Integer, ZstdDictCompress>
zstdDictCompressPerLevel,
+ AtomicReference<ZstdDictDecompress> dictDecompress)
{
this.zstdDictCompressPerLevel = zstdDictCompressPerLevel;
this.dictDecompress = dictDecompress;
}
+ /**
+ * Clean up native resources when reference count reaches zero.
+ *
+ * IMPORTANT: This method is called exactly once when the last
reference is released.
+ * Reference counting guarantees that no other thread can be executing
+ * dictionaryForCompression/Decompression when this runs, because:
+ * 1. Those methods require holding a valid reference
+ * 2. This only runs when refcount goes from 0 to -1
+ * 3. Once refcount is negative, tryRef() returns null, preventing new
references
+ *
+ * Therefore, no synchronization is needed - we have exclusive access
to clean up.
+ */
@Override
public void tidy()
{
// Close all compression dictionaries
+ // No synchronization needed - reference counting ensures
exclusive access
for (ZstdDictCompress compressDict :
zstdDictCompressPerLevel.values())
{
try
@@ -220,7 +252,7 @@ public class ZstdCompressionDictionary implements
CompressionDictionary, SelfRef
zstdDictCompressPerLevel.clear();
// Close decompression dictionary
- ZstdDictDecompress decompressDict = dictDecompress;
+ ZstdDictDecompress decompressDict = dictDecompress.get();
if (decompressDict != null)
{
try
@@ -231,7 +263,7 @@ public class ZstdCompressionDictionary implements
CompressionDictionary, SelfRef
{
logger.warn("Failed to close ZstdDictDecompress", e);
}
- dictDecompress = null;
+ dictDecompress.set(null);
}
}
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index f49734a659..bd09b9081c 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -133,7 +133,8 @@ public class CompressionMetadata extends
WrappedSharedCloseable
compressedLength,
compressionDictionary);
}
- // do not call this constructor directly, unless used in testing
+ // Do not call this constructor from outside this class file, except in
tests.
+ // Within this class, use the static open() method or the Writer.open()
method instead.
@VisibleForTesting
public CompressionMetadata(File chunksIndexFile,
CompressionParams parameters,
@@ -143,7 +144,8 @@ public class CompressionMetadata extends
WrappedSharedCloseable
long compressedFileLength,
CompressionDictionary compressionDictionary)
{
- super(chunkOffsets);
+ // Build array with chunkOffsets and a wrapper that releases
dictionary ref
+ super(buildCloseableArray(chunkOffsets, compressionDictionary));
this.chunksIndexFile = chunksIndexFile;
this.parameters = parameters;
this.dataLength = dataLength;
@@ -153,6 +155,37 @@ public class CompressionMetadata extends
WrappedSharedCloseable
this.compressionDictionary = compressionDictionary;
}
+ private static AutoCloseable[] buildCloseableArray(Memory chunkOffsets,
CompressionDictionary dictionary)
+ {
+ if (dictionary == null)
+ return new AutoCloseable[] { chunkOffsets };
+
+ Ref<? extends CompressionDictionary> dictRef = dictionary.tryRef();
+ if (dictRef == null)
+ {
+ // Close chunkOffsets before throwing to prevent resource leak.
+ // The CompressionMetadata constructor will not complete if we
throw here,
+ // so we must clean up resources that were passed in.
+ chunkOffsets.close();
+ throw new IllegalStateException("Failed to acquire reference to
compression dictionary");
+ }
+
+ return new AutoCloseable[] { chunkOffsets, dictRef::release };
+ }
+
+ /**
+ * Copy constructor for creating shared copies via sharedCopy().
+ * <br>
+ * This uses the WrappedSharedCloseable pattern where all copies share the
same
+ * underlying resources (chunkOffsets Memory and dictionary reference).
The super()
+ * call increments the shared reference count, and resources are only
released when
+ * the last copy is closed.
+ * <br>
+ * Reference counting behavior:
+ * - Original CompressionMetadata acquires 1 dictionary reference (in
buildCloseableArray)
+ * - All copies share that reference (via super(copy) incrementing shared
ref count)
+ * - When last copy closes, WrappedSharedCloseable.Tidy releases the
reference once
+ */
private CompressionMetadata(CompressionMetadata copy)
{
super(copy);
@@ -163,11 +196,11 @@ public class CompressionMetadata extends
WrappedSharedCloseable
this.chunkOffsets = copy.chunkOffsets;
this.chunkOffsetsSize = copy.chunkOffsetsSize;
this.compressionDictionary = copy.compressionDictionary;
+ this.resolvedCompressor = copy.resolvedCompressor;
}
public ICompressor compressor()
{
- // classic double-checked locking to call resolveCompressor method
just once per CompressionMetadata object
ICompressor result = resolvedCompressor;
if (result != null)
return result;
@@ -228,6 +261,8 @@ public class CompressionMetadata extends
WrappedSharedCloseable
{
super.addTo(identities);
identities.add(chunkOffsets);
+ // Note: compressionDictionary ref is managed by
WrappedSharedCloseable,
+ // so it's already tracked through the parent's identity collection
}
@Override
@@ -408,15 +443,37 @@ public class CompressionMetadata extends
WrappedSharedCloseable
// provided by user when setDescriptor
private long dataLength, chunkCount;
@Nullable
- private CompressionDictionary compressionDictionary;
+ private final CompressionDictionary compressionDictionary;
+ @Nullable // Reference to keep dictionary alive during write
+ private Ref<? extends CompressionDictionary> compressionDictionaryRef;
private Writer(CompressionParams parameters, File file,
CompressionDictionary compressionDictionary)
{
this.parameters = parameters;
this.file = file;
this.compressionDictionary = compressionDictionary;
+ // Take a reference to ensure dictionary stays alive during
SSTable write
+ if (compressionDictionary != null)
+ {
+ this.compressionDictionaryRef = compressionDictionary.tryRef();
+ if (compressionDictionaryRef == null)
+ {
+ // Clean up offsets SafeMemory allocated in field
initializer before throwing
+ // to prevent resource leak. The offsets field is
initialized before constructor
+ // body runs, so it must be explicitly cleaned up if
construction fails.
+ offsets.close();
+ throw new IllegalStateException("Failed to acquire
reference to compression dictionary " + compressionDictionary.dictId());
+ }
+ }
}
+ /**
+ * Creates a new Writer for compression metadata.
+ *
+ * Note on resource management: If this method throws an exception,
all resources
+ * are properly cleaned up. The Writer constructor ensures that if
dictionary
+ * reference acquisition fails, the offsets SafeMemory is released.
+ */
public static Writer open(CompressionParams parameters,
File file,
CompressionDictionary compressionDictionary)
@@ -569,12 +626,24 @@ public class CompressionMetadata extends
WrappedSharedCloseable
@Override
protected Throwable doCommit(Throwable accumulate)
{
+ // Release the dictionary reference after successful write
+ if (compressionDictionaryRef != null)
+ {
+ compressionDictionaryRef.release();
+ compressionDictionaryRef = null;
+ }
return accumulate;
}
@Override
protected Throwable doAbort(Throwable accumulate)
{
+ // Release the dictionary reference
+ if (compressionDictionaryRef != null)
+ {
+ compressionDictionaryRef.release();
+ compressionDictionaryRef = null;
+ }
return accumulate;
}
}
diff --git
a/src/java/org/apache/cassandra/io/compress/ZstdDictionaryCompressor.java
b/src/java/org/apache/cassandra/io/compress/ZstdDictionaryCompressor.java
index 3ba5841aaa..59cba54b20 100644
--- a/src/java/org/apache/cassandra/io/compress/ZstdDictionaryCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/ZstdDictionaryCompressor.java
@@ -33,6 +33,7 @@ import com.github.luben.zstd.Zstd;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.db.compression.ZstdCompressionDictionary;
import org.apache.cassandra.db.compression.CompressionDictionary.Kind;
import org.apache.cassandra.utils.concurrent.Ref;
@@ -66,7 +67,7 @@ public class ZstdDictionaryCompressor extends
ZstdCompressorBase implements ICom
/**
* Create a ZstdDictionaryCompressor with the given options
- * Invoked by {@link
org.apache.cassandra.schema.CompressionParams#createCompressor} via reflection
+ * Invoked by {@link
org.apache.cassandra.schema.CompressionParams#createCompressor(ParameterizedClass)}
via reflection
*
* @param options compression options
* @return ZstdDictionaryCompressor
@@ -93,11 +94,10 @@ public class ZstdDictionaryCompressor extends
ZstdCompressorBase implements ICom
return instancePerDict.get(dictionary, dict -> {
// Get a reference to the dictionary when creating new compressor
- Ref<ZstdCompressionDictionary> ref = dict != null ? dict.tryRef()
: null;
- if (ref == null && dict != null)
+ Ref<ZstdCompressionDictionary> ref = dict.tryRef();
+ if (ref == null)
{
- // Dictionary is being closed, cannot create compressor
- throw new IllegalStateException("Dictionary is being closed");
+ throw new IllegalStateException("Dictionary is released");
}
return new ZstdDictionaryCompressor(level, dictionary, ref);
});
@@ -212,5 +212,6 @@ public class ZstdDictionaryCompressor extends
ZstdCompressorBase implements ICom
public static void invalidateCache()
{
instancePerDict.invalidateAll();
+ instancePerDict.cleanUp();
}
}
diff --git a/src/java/org/apache/cassandra/utils/concurrent/RefCounted.java
b/src/java/org/apache/cassandra/utils/concurrent/RefCounted.java
index d09d288d3e..e60be973da 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/RefCounted.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/RefCounted.java
@@ -33,15 +33,37 @@ package org.apache.cassandra.utils.concurrent;
public interface RefCounted<T>
{
/**
- * @return the a new Ref() to the managed object, incrementing its
refcount, or null if it is already released
+ * Attempts to acquire a new reference to the managed object and
+ * increment the reference count.
+ *
+ * @return a new Ref to the managed object, or null if already released
*/
public Ref<T> tryRef();
+ /**
+ * Acquires a new reference to the managed object and increment the
+ * reference count.
+ *
+ * @return a new Ref to the managed object
+ * @throws IllegalStateException if already released
+ */
public Ref<T> ref();
public static interface Tidy
{
+ /**
+ * Performs cleanup of resources when the reference count reaches zero.
+ * Called exactly once when the last reference is released.
+ *
+ * @throws Exception if cleanup fails
+ */
void tidy() throws Exception;
+
+ /**
+ * Returns a human-readable name for debugging and leak detection.
+ *
+ * @return a descriptive name, typically the class name of the tracked
object
+ */
String name();
}
}
diff --git
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryCacheTest.java
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryCacheTest.java
index 878780982d..46565b7c04 100644
---
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryCacheTest.java
+++
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryCacheTest.java
@@ -421,13 +421,13 @@ public class CompressionDictionaryCacheTest
}
}
- private static void closeQuietly(AutoCloseable resource)
+ private static void closeQuietly(ZstdCompressionDictionary resource)
{
if (resource != null)
{
try
{
- resource.close();
+ resource.selfRef().release();
}
catch (Exception e)
{
diff --git
a/test/unit/org/apache/cassandra/db/compression/ZstdCompressionDictionaryTest.java
b/test/unit/org/apache/cassandra/db/compression/ZstdCompressionDictionaryTest.java
index e054776f96..2b226eed4d 100644
---
a/test/unit/org/apache/cassandra/db/compression/ZstdCompressionDictionaryTest.java
+++
b/test/unit/org/apache/cassandra/db/compression/ZstdCompressionDictionaryTest.java
@@ -68,7 +68,7 @@ public class ZstdCompressionDictionaryTest
{
ZstdCompressionDictionary dictionary2 = new
ZstdCompressionDictionary(SAMPLE_DICT_ID, SAMPLE_DICT_DATA);
ZstdCompressionDictionary differentIdDict = new
ZstdCompressionDictionary(
- new DictId(Kind.ZSTD, 987654321L), SAMPLE_DICT_DATA);
+ new DictId(Kind.ZSTD, 987654321L), SAMPLE_DICT_DATA);
assertThat(dictionary)
.as("Dictionaries with same ID should be equal")
@@ -82,8 +82,8 @@ public class ZstdCompressionDictionaryTest
.as("Dictionaries with different IDs should not be equal")
.isNotEqualTo(differentIdDict);
- dictionary2.close();
- differentIdDict.close();
+ dictionary2.selfRef().release();
+ differentIdDict.selfRef().release();
}
@Test
@@ -168,17 +168,48 @@ public class ZstdCompressionDictionaryTest
dictionary.dictionaryForCompression(3);
dictionary.dictionaryForDecompression();
- dictionary.close();
+ // Release the self-reference
+ dictionary.selfRef().release();
- assertThatThrownBy(() -> dictionary.dictionaryForCompression(3))
- .as("Should throw exception when accessing closed dictionary")
+ // After releasing selfRef, tryRef should return null
+ Ref<ZstdCompressionDictionary> ref = dictionary.tryRef();
+ assertThat(ref)
+ .as("tryRef should return null after dictionary is released")
+ .isNull();
+ }
+
+ @Test
+ public void testDictionaryAccessWithoutReference()
+ {
+ // Create a new dictionary for this test
+ ZstdCompressionDictionary testDict = new ZstdCompressionDictionary(
+ new DictId(Kind.ZSTD, 999999L),
+ SAMPLE_DICT_DATA
+ );
+
+ // Access some dictionaries first to initialize them
+ testDict.dictionaryForCompression(3);
+ testDict.dictionaryForDecompression();
+
+ // Release the self-reference to simulate dictionary cleanup
+ testDict.selfRef().release();
+
+ // Verify tryRef returns null after release
+ assertThat(testDict.tryRef())
+ .as("tryRef should return null after dictionary is released")
+ .isNull();
+
+ // Accessing dictionary methods without a valid reference should throw
IllegalStateException
+ // This protects against use-after-free bugs in both development and
production
+ assertThatThrownBy(() -> testDict.dictionaryForCompression(3))
+ .as("Should throw exception when accessing released dictionary")
.isInstanceOf(IllegalStateException.class)
- .hasMessageContaining("Dictionary has been closed");
+ .hasMessageContaining("Dictionary has been released");
- assertThatThrownBy(() -> dictionary.dictionaryForDecompression())
- .as("Should throw exception when accessing closed dictionary")
+ assertThatThrownBy(() -> testDict.dictionaryForDecompression())
+ .as("Should throw exception when accessing released dictionary")
.isInstanceOf(IllegalStateException.class)
- .hasMessageContaining("Dictionary has been closed");
+ .hasMessageContaining("Dictionary has been released");
}
@Test
@@ -229,15 +260,16 @@ public class ZstdCompressionDictionaryTest
@Test
public void testReferenceAfterClose()
{
- dictionary.close();
+ // Release the self-reference
+ dictionary.selfRef().release();
assertThatThrownBy(() -> dictionary.ref())
- .as("Should not be able to get reference after close")
+ .as("Should not be able to get reference after release")
.isInstanceOf(AssertionError.class);
Ref<ZstdCompressionDictionary> tryRef = dictionary.tryRef();
assertThat(tryRef)
- .as("tryRef should return null after close")
+ .as("tryRef should return null after release")
.isNull();
}
@@ -355,8 +387,8 @@ public class ZstdCompressionDictionaryTest
.isNotNull()
.isEqualTo(dict2);
- dict1.close();
- dict2.close();
+ dict1.selfRef().release();
+ dict2.selfRef().release();
}
@Test
diff --git
a/test/unit/org/apache/cassandra/io/compress/CompressionMetadataTest.java
b/test/unit/org/apache/cassandra/io/compress/CompressionMetadataTest.java
index 560a0af635..801a07c151 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressionMetadataTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressionMetadataTest.java
@@ -19,14 +19,21 @@
package org.apache.cassandra.io.compress;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compression.CompressionDictionary;
+import org.apache.cassandra.db.compression.CompressionDictionaryCache;
+import org.apache.cassandra.db.compression.ZstdCompressionDictionary;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.Memory;
import org.apache.cassandra.schema.CompressionParams;
+import static org.apache.cassandra.Util.spinAssertEquals;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
public class CompressionMetadataTest
{
@@ -35,6 +42,12 @@ public class CompressionMetadataTest
long dataLength = 1000;
long compressedFileLength = 100;
+ @BeforeClass
+ public static void setUpClass()
+ {
+ DatabaseDescriptor.daemonInitialization();
+ }
+
private CompressionMetadata newCompressionMetadata(Memory memory)
{
return new CompressionMetadata(chunksIndexFile,
@@ -76,4 +89,170 @@ public class CompressionMetadataTest
assertThat(copy.isCleanedUp()).isTrue();
assertThatExceptionOfType(AssertionError.class).isThrownBy(memory::size);
}
+
+ /**
+ * Test that simulates the Zstd with dictionary → LZ4 schema change
scenario. (CASSANDRA-21047)
+ */
+ @Test
+ public void testDictionaryRefCountingDuringSchemaChange()
+ {
+ // Step 1: Create a dictionary and add it to cache (simulating table
with Zstd compression)
+ CompressionDictionary.DictId dictId = new
CompressionDictionary.DictId(CompressionDictionary.Kind.ZSTD, 1L);
+ byte[] dictBytes = "sample dictionary data for compression".getBytes();
+ ZstdCompressionDictionary dictionary = new
ZstdCompressionDictionary(dictId, dictBytes);
+
+ assertThat(dictionary.selfRef().globalCount()).isOne();
+ CompressionDictionaryCache cache = new CompressionDictionaryCache();
+ cache.add(dictionary);
+
+ // Verify dictionary is in cache
+ CompressionDictionary cachedDict = cache.get(dictId);
+ assertThat(cachedDict)
+ .as("Dictionary should be cached")
+ .isNotNull()
+ .isSameAs(dictionary);
+ assertThat(dictionary.selfRef().globalCount()).isOne();
+
+ // Step 2: Open SSTable with dictionary (simulating
CompressionMetadata)
+ Memory memory = Memory.allocate(100);
+ CompressionMetadata metadata = new CompressionMetadata(
+ chunksIndexFile,
+ params,
+ memory,
+ memory.size(),
+ dataLength,
+ compressedFileLength,
+ dictionary // CompressionMetadata takes a reference
+ );
+ // RC is incremented
+ assertThat(dictionary.selfRef().globalCount()).isEqualTo(2);
+
+ // Verify we can get the compressor (which uses the dictionary)
+ ICompressor compressor = metadata.compressor();
+ assertThat(compressor)
+ .as("Should be able to get compressor with dictionary")
+ .isNotNull();
+ // RC is incremented
+ assertThat(dictionary.selfRef().globalCount()).isEqualTo(3);
+
+ // Step 3: Schema change - close cache (simulating switch from Zstd to
LZ4)
+ // This releases the cache's reference to the dictionary
+ cache.close();
+ // RC is decremented
+ spinAssertEquals(2, () -> dictionary.selfRef().globalCount());
+
+ // Step 4: Verify dictionary is still usable via CompressionMetadata
+ // This is the key test - before the fix, this would fail with
"Dictionary has been closed"
+ // compressor should return the cached compressor, does not change the
reference count
+ ICompressor compressorAfterCacheClosed = metadata.compressor();
+ assertThat(compressorAfterCacheClosed)
+ .as("Compressor should still be accessible after cache is closed")
+ .isNotNull()
+ .isSameAs(compressor); // Should return the same cached compressor
+
+ // Verify dictionary methods still work
+ assertThat(dictionary.dictionaryForDecompression())
+ .as("Dictionary decompression should still work")
+ .isNotNull();
+
+ // Step 5: Close CompressionMetadata (simulating SSTable being
compacted away)
+ metadata.close();
+ // RC is decremented
+ assertThat(dictionary.selfRef().globalCount()).isOne();
+
+ // Step 6: Dictionary is still usable because ZstdDictionaryCompressor
cache holds a reference
+ // This is expected behavior - the compressor cache is global and may
keep dictionaries
+ // alive for reuse across SSTables
+ assertThat(dictionary.dictionaryForDecompression())
+ .as("Dictionary should still work due to compressor cache")
+ .isNotNull();
+
+ ZstdDictionaryCompressor.invalidateCache();
+ // RC is decremented
+ spinAssertEquals(0, () -> dictionary.selfRef().globalCount());
+ assertThatThrownBy(dictionary::dictionaryForDecompression)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("Dictionary has been released");
+ }
+
+ /**
+ * Test multiple CompressionMetadata instances sharing the same dictionary
+ * and surviving cache eviction. (CASSANDRA-21047)
+ */
+ @Test
+ public void testMultipleMetadataInstancesSharingDictionary()
+ {
+ // Create dictionary and cache
+ CompressionDictionary.DictId dictId = new
CompressionDictionary.DictId(CompressionDictionary.Kind.ZSTD, 2L);
+ byte[] dictBytes = "shared dictionary data".getBytes();
+ ZstdCompressionDictionary dictionary = new
ZstdCompressionDictionary(dictId, dictBytes);
+
+ assertThat(dictionary.selfRef().globalCount()).isOne();
+ CompressionDictionaryCache cache = new CompressionDictionaryCache();
+ cache.add(dictionary);
+
+ // Create multiple CompressionMetadata instances (simulating multiple
SSTables)
+ Memory memory1 = Memory.allocate(100);
+ CompressionMetadata metadata1 = new CompressionMetadata(
+ chunksIndexFile, params, memory1, memory1.size(),
+ dataLength, compressedFileLength, dictionary
+ );
+ assertThat(dictionary.selfRef().globalCount()).isEqualTo(2);
+
+ Memory memory2 = Memory.allocate(100);
+ CompressionMetadata metadata2 = new CompressionMetadata(
+ chunksIndexFile, params, memory2, memory2.size(),
+ dataLength, compressedFileLength, dictionary
+ );
+ assertThat(dictionary.selfRef().globalCount()).isEqualTo(3);
+
+ Memory memory3 = Memory.allocate(100);
+ CompressionMetadata metadata3 = new CompressionMetadata(
+ chunksIndexFile, params, memory3, memory3.size(),
+ dataLength, compressedFileLength, dictionary
+ );
+ assertThat(dictionary.selfRef().globalCount()).isEqualTo(4);
+
+ // All should be able to get compressors
+ assertThat(metadata1.compressor()).isNotNull();
+ assertThat(metadata2.compressor()).isNotNull();
+ assertThat(metadata3.compressor()).isNotNull();
+ assertThat(dictionary.selfRef().globalCount()).isEqualTo(5);
+
+ // Close cache (schema change)
+ cache.close();
+ spinAssertEquals(4, () -> dictionary.selfRef().globalCount());
+
+ // All metadata instances should still work
+ assertThat(metadata1.compressor()).isNotNull();
+ assertThat(metadata2.compressor()).isNotNull();
+ assertThat(metadata3.compressor()).isNotNull();
+
+ // Close metadata instances one by one
+ metadata1.close();
+ assertThat(dictionary.selfRef().globalCount()).isEqualTo(3);
+ assertThat(metadata2.compressor())
+ .as("Other metadata should still work")
+ .isNotNull();
+ assertThat(metadata3.compressor())
+ .as("Other metadata should still work")
+ .isNotNull();
+
+ metadata2.close();
+ assertThat(dictionary.selfRef().globalCount()).isEqualTo(2);
+ assertThat(metadata3.compressor())
+ .as("Last metadata should still work")
+ .isNotNull();
+
+ // Close last instance - now dictionary should be released
+ metadata3.close();
+ assertThat(dictionary.selfRef().globalCount()).isEqualTo(1);
+
+ ZstdDictionaryCompressor.invalidateCache();
+ // RC is decremented
+ spinAssertEquals(0, () -> dictionary.selfRef().globalCount());
+ assertThatThrownBy(dictionary::dictionaryForDecompression)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("Dictionary has been released");
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]