This is an automated email from the ASF dual-hosted git repository.
smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new b08da36770 Change the eager reference counting of compression
dictionaries to lazy
b08da36770 is described below
commit b08da367708763d34b399a77823a0433a4160347
Author: Stefan Miklosovic <[email protected]>
AuthorDate: Mon Dec 15 12:57:08 2025 +0100
Change the eager reference counting of compression dictionaries to lazy
patch by Stefan Miklosovic; reviewed by Yifan Cai, Jyothsna Konisa for
CASSANDRA-21074
---
CHANGES.txt | 1 +
.../db/compression/CompressionDictionary.java | 35 ++++++++++++++++++----
.../db/compression/CompressionDictionaryCache.java | 14 +++++++--
.../db/compression/ZstdCompressionDictionary.java | 28 ++++++++++++++---
.../CompressionDictionaryCacheTest.java | 7 ++++-
.../compression/ZstdCompressionDictionaryTest.java | 10 ++-----
6 files changed, 75 insertions(+), 20 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index c4ecfb016c..ebafbf08da 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Change the eager reference counting of compression dictionaries to lazy
(CASSANDRA-21074)
* Add cursor based optimized compaction path (CASSANDRA-20918)
* Ensure peers with LEFT status are expired from gossip state
(CASSANDRA-21035)
* Optimize UTF8Validator.validate for ASCII prefixed Strings (CASSANDRA-21075)
diff --git
a/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
b/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
index fff6255c3f..a6c78da497 100644
--- a/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
+++ b/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
@@ -36,7 +36,16 @@ import
org.apache.cassandra.io.compress.ZstdDictionaryCompressor;
import org.apache.cassandra.utils.concurrent.Ref;
/**
- * Interface for compression dictionaries with reference-counted lifecycle
management.
+ * Interface for compression dictionaries with opt-in reference-counted
lifecycle management.
+ * <p>
+ * Dictionaries can be used in two modes:
+ * <ul>
+ * <li><b>Lightweight mode</b>: No native resources allocated. Suitable for
export, serialization,
+ * or scenarios where only the raw dictionary bytes are needed.</li>
+ * <li><b>Managed mode</b>: Native compression/decompression resources
allocated on-demand and
+ * managed via reference counting. Required for caching and active
use.</li>
+ * </ul>
+ * Call {@link #initRefLazily()} or {@link #tryRef()} to transition from
lightweight to managed mode.
*
* <h2>Reference Counting Model</h2>
* Compression dictionaries hold native resources that must be explicitly
managed. This interface
@@ -109,6 +118,13 @@ public interface CompressionDictionary
return dictId().kind;
}
+ /**
+ * Returns a reference from lazily initialized reference counter.
+ *
+ * @return reference to this dictionary; once initialized, the reference
is the same as self-reference
+ */
+ Ref<? extends CompressionDictionary> initRefLazily();
+
/**
* Try to acquire a new reference to this dictionary.
* Returns null if the dictionary is already released.
@@ -125,9 +141,11 @@ public interface CompressionDictionary
/**
* Get the self-reference of this dictionary.
* This is used to release the primary reference held by the cache.
+ * Self-reference is initialized after initRefLazily or tryRef
*
- * @return the self-reference
+ * @return the self-reference or null if not yet initialized.
*/
+ @Nullable
Ref<? extends CompressionDictionary> selfRef();
/**
@@ -137,9 +155,12 @@ public interface CompressionDictionary
* This method is idempotent - calling it multiple times is safe and will
only
* release the self-reference once. Subsequent calls have no effect.
* <p>
- * This method is typically used when creating a dictionary outside the
cache
- * (e.g., in tests or temporary usage) and needing to clean it up. For
dictionaries
- * managed by the cache, the cache's removal listener handles cleanup via
+ * There is no need to call this method in the context of releasing
references when an instance
+ * is never put to a cache. That might be the case when we are
constructing a dictionary object
+ * just for the purpose of passing it to a user (e.g. on exporting and
similar) where reference counting
+ * is not necessary nor needed.
+ * <p>
+ * For dictionaries managed by the cache, the cache's removal listener
handles cleanup via
* {@code selfRef().release()}.
*
* @see #selfRef()
@@ -148,7 +169,9 @@ public interface CompressionDictionary
@VisibleForTesting
default void close()
{
- selfRef().close();
+ Ref<?> selfRef = selfRef();
+ if (selfRef != null)
+ selfRef.close();
}
/**
diff --git
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryCache.java
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryCache.java
index a58bd0e386..126341e925 100644
---
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryCache.java
+++
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryCache.java
@@ -32,6 +32,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.compression.CompressionDictionary.DictId;
+import org.apache.cassandra.utils.concurrent.Ref;
/**
* Manages caching and current dictionary state for compression dictionaries.
@@ -68,7 +69,9 @@ public class CompressionDictionaryCache implements
ICompressionDictionaryCache
{
try
{
- dictionary.selfRef().release();
+ // The dictionary's selfRef should
never be null when evicting from cache
+ // but using close() to have better
resiliency
+ dictionary.close();
}
catch (Exception e)
{
@@ -102,7 +105,14 @@ public class CompressionDictionaryCache implements
ICompressionDictionaryCache
// Only update cache if not already in the cache
DictId newDictId = compressionDictionary.dictId();
- cache.get(newDictId, id -> compressionDictionary);
+ cache.get(newDictId, id -> {
+ Ref<?> ref = compressionDictionary.initRefLazily();
+ if (ref == null)
+ {
+ throw new IllegalStateException("Failed to acquire reference
to compression dictionary");
+ }
+ return compressionDictionary;
+ });
// Update current dictionary if we don't have one or the new one has a
higher ID (newer)
DictId currentId = currentDictId.get();
diff --git
a/src/java/org/apache/cassandra/db/compression/ZstdCompressionDictionary.java
b/src/java/org/apache/cassandra/db/compression/ZstdCompressionDictionary.java
index ed83a8a4d5..a1af1ce9cc 100644
---
a/src/java/org/apache/cassandra/db/compression/ZstdCompressionDictionary.java
+++
b/src/java/org/apache/cassandra/db/compression/ZstdCompressionDictionary.java
@@ -43,7 +43,7 @@ public class ZstdCompressionDictionary implements
CompressionDictionary, SelfRef
// One ZstdDictDecompress and multiple ZstdDictCompress (per level) can be
derived from the same raw dictionary content
private final ConcurrentHashMap<Integer, ZstdDictCompress>
zstdDictCompressPerLevel = new ConcurrentHashMap<>();
private final AtomicReference<ZstdDictDecompress> dictDecompress = new
AtomicReference<>();
- private final Ref<ZstdCompressionDictionary> selfRef;
+ private volatile Ref<ZstdCompressionDictionary> selfRef;
@VisibleForTesting
public ZstdCompressionDictionary(DictId dictId, byte[] rawDictionary)
@@ -58,7 +58,7 @@ public class ZstdCompressionDictionary implements
CompressionDictionary, SelfRef
this.dictId = dictId;
this.rawDictionary = rawDictionary;
this.checksum = checksum;
- this.selfRef = new Ref<>(this, new Tidy(zstdDictCompressPerLevel,
dictDecompress));
+ this.selfRef = null;
}
@Override
@@ -180,7 +180,7 @@ public class ZstdCompressionDictionary implements
CompressionDictionary, SelfRef
@Override
public Ref<ZstdCompressionDictionary> tryRef()
{
- return selfRef.tryRef();
+ return initRefLazily().tryRef();
}
@Override
@@ -192,11 +192,31 @@ public class ZstdCompressionDictionary implements
CompressionDictionary, SelfRef
@Override
public Ref<ZstdCompressionDictionary> ref()
{
- return selfRef.ref();
+ return initRefLazily().ref();
+ }
+
+ @Override
+ public Ref<ZstdCompressionDictionary> initRefLazily()
+ {
+ if (selfRef == null)
+ {
+ synchronized (this)
+ {
+ if (selfRef == null)
+ {
+ selfRef = new Ref<>(this, new
Tidy(zstdDictCompressPerLevel, dictDecompress));
+ }
+ }
+ }
+ return selfRef;
}
private void ensureNotReleased()
{
+ if (selfRef == null)
+ throw new IllegalStateException("Dictionary ref is not
initialized. " +
+ "Call initRefLazily() or tryRef()
first: " + dictId);
+
if (selfRef.globalCount() <= 0)
throw new IllegalStateException("Dictionary has been released: " +
dictId);
}
diff --git
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryCacheTest.java
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryCacheTest.java
index 46565b7c04..bd76e5137d 100644
---
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryCacheTest.java
+++
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryCacheTest.java
@@ -32,6 +32,7 @@ import org.junit.Test;
import com.github.luben.zstd.ZstdDictTrainer;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.concurrent.Ref;
import static org.apache.cassandra.db.compression.CompressionDictionary.DictId;
import static org.apache.cassandra.db.compression.CompressionDictionary.Kind;
@@ -427,7 +428,11 @@ public class CompressionDictionaryCacheTest
{
try
{
- resource.selfRef().release();
+ Ref<ZstdCompressionDictionary> selfRef = resource.selfRef();
+ if (selfRef != null && selfRef.globalCount() > 0)
+ {
+ selfRef.release();
+ }
}
catch (Exception e)
{
diff --git
a/test/unit/org/apache/cassandra/db/compression/ZstdCompressionDictionaryTest.java
b/test/unit/org/apache/cassandra/db/compression/ZstdCompressionDictionaryTest.java
index 2b226eed4d..8c04a7d03f 100644
---
a/test/unit/org/apache/cassandra/db/compression/ZstdCompressionDictionaryTest.java
+++
b/test/unit/org/apache/cassandra/db/compression/ZstdCompressionDictionaryTest.java
@@ -61,6 +61,7 @@ public class ZstdCompressionDictionaryTest
public void setUp()
{
dictionary = new ZstdCompressionDictionary(SAMPLE_DICT_ID,
SAMPLE_DICT_DATA);
+ dictionary.initRefLazily();
}
@Test
@@ -81,9 +82,6 @@ public class ZstdCompressionDictionaryTest
assertThat(dictionary)
.as("Dictionaries with different IDs should not be equal")
.isNotEqualTo(differentIdDict);
-
- dictionary2.selfRef().release();
- differentIdDict.selfRef().release();
}
@Test
@@ -187,6 +185,8 @@ public class ZstdCompressionDictionaryTest
SAMPLE_DICT_DATA
);
+ testDict.initRefLazily();
+
// Access some dictionaries first to initialize them
testDict.dictionaryForCompression(3);
testDict.dictionaryForDecompression();
@@ -260,7 +260,6 @@ public class ZstdCompressionDictionaryTest
@Test
public void testReferenceAfterClose()
{
- // Release the self-reference
dictionary.selfRef().release();
assertThatThrownBy(() -> dictionary.ref())
@@ -386,9 +385,6 @@ public class ZstdCompressionDictionaryTest
.as("Both deserializations should return identical dictionary")
.isNotNull()
.isEqualTo(dict2);
-
- dict1.selfRef().release();
- dict2.selfRef().release();
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]